<h1>Database 2 Project - Clean and merge data from csv files</h1>
<h3>
    Group FRANGI: <br>
    Francesco Frigato, Andrea Felline, Gianluca Antolini <br>
    <br>
    Topic: <br>
    Spotify songs and their Youtube videos
</h3>

<h4>Install and import required Libraries</h4>

In [1]:
import csv
import os
from pathlib import Path

<h4>Set global variables</h4>
<ol>
    <li>Path to the csv dataset</li>
    <li>Dictionaries to help fixing some issues in the datasets</li>
</ol>

In [7]:
path = str(Path(os.path.abspath(os.getcwd())).parent.absolute())

# input
fileSY = open(path + "/Datasets/Original/Spotify_Youtube.csv", "r", newline="", encoding="utf-8")
fileTF = open(path + "/Datasets/Original/spotify_songs.csv", "r", newline="", encoding="utf-8")

# output
fileCD = open(path + "/Datasets/Computed/complete_dataset.csv", "w", newline="", encoding="utf-8")

# Dicts for data cleaning
songsDict = {}  # check number of streams of each song
videosDict = {}  # check values of comments, likes, views and video officiality

# Dicts for data merging
channelNamesIds = {}  # Retrive or create channel IDs
albumNamesIds = {}  # Retrive or create album IDs

<h4>Function to fix some issues on the data of the Spotify-Youtube dataset</h4>
In particular if a song appears more than one time (for example because two artist made it, and there is a row for each one):

 - Sometimes the number of streams is different (probably the rows were taken in different moments), so we take the highest value
 - Sometimes the number of comments, likes, views and the officiality of the video is different, so we take the last value (the most recent)

In [3]:
def updateLine(rowSY):
    
    # Keep higher number of streams for each song
    if rowSY["Stream"] != "" and rowSY["Stream"] is not None:
        if (rowSY["Uri"] not in songsDict) or (
            (rowSY["Uri"] in songsDict)
            and (int(rowSY["Stream"]) > songsDict[rowSY["Uri"]])
        ):
            songsDict[rowSY["Uri"]] = int(rowSY["Stream"])
    
    # Keep last value for comments, likes, views and video officiality
    if rowSY["official_video"] not in videosDict:
        videosDict[rowSY["Url_youtube"]] = [
            rowSY["Comments"],
            rowSY["Likes"],
            rowSY["Views"],
            rowSY["official_video"]
        ]
    else:
        if rowSY["Comments"] != "" and rowSY["Comments"] is not None:
            videosDict[rowSY["Url_youtube"]][0] = rowSY["Comments"]
        if rowSY["Likes"] != "" and rowSY["Likes"] is not None:
            videosDict[rowSY["Url_youtube"]][1] = rowSY["Likes"]
        if rowSY["Views"] != "" and rowSY["Views"] is not None:
            videosDict[rowSY["Url_youtube"]][2] = rowSY["Views"]
        if rowSY["official_video"] != "" and rowSY["official_video"] is not None:
            videosDict[rowSY["Url_youtube"]][3] = rowSY["official_video"]

<h4>Function merge the two datasets in one</h4>

Writes a row with data from both datasets

In [4]:
def writeLine(writerCD, rowSY, rowTF):
    
    # Retrive or create channel ID
    channelId = ""
    if rowSY["Channel"] != "" and rowSY["Channel"] is not None:
        if rowSY["Channel"] in channelNamesIds:
            channelId = channelNamesIds[rowSY["Channel"]]
        else:
            # create a new channel id : "channel" + len(channelNamesIds)
            channelId = "channel" + str(len(channelNamesIds))
            # add the channel id to the map
            channelNamesIds[rowSY["Channel"]] = channelId
    
    # Retrive or create album ID
    albumId = ""
    if rowSY["Album"] != "" and rowSY["Album"] is not None:
        if rowSY["Album_type"] != "" and rowSY["Album_type"] is not None:
            if (rowSY["Album"], rowSY["Album_type"]) in albumNamesIds:
                albumId = albumNamesIds[(rowSY["Album"], rowSY["Album_type"])]
            else:
                # create a new album id : "album" + len(albumNamesIds)
                albumId = "album" + str(len(albumNamesIds))
                # add the album id to the map
                albumNamesIds[(rowSY["Album"], rowSY["Album_type"])] = albumId
        else:
            if (rowSY["Album"], "") in albumNamesIds:
                albumId = albumNamesIds[(rowSY["Album"], "")]
            else:
                # create a new album id : "album" + len(albumNamesIds)
                albumId = "album" + str(len(albumNamesIds))
                # add the album id to the map
                albumNamesIds[(rowSY["Album"], "")] = albumId
    
    # Check and write every value
    writerCD.writerow(
        {
            "Track": rowSY["Track"],
            "Artist": rowSY["Artist"],
            "Url_spotify": rowSY["Url_spotify"],
            "Album": rowSY["Album"],
            "Album_type": rowSY["Album_type"],
            "Uri": rowSY["Uri"],
            "Danceability": rowSY["Danceability"],
            "Energy": rowSY["Energy"],
            "Key": rowSY["Key"],
            "Loudness": rowSY["Loudness"],
            "Speechiness": rowSY["Speechiness"],
            "Acousticness": rowSY["Acousticness"],
            "Instrumentalness": rowSY["Instrumentalness"],
            "Liveness": rowSY["Liveness"],
            "Valence": rowSY["Valence"],
            "Tempo": rowSY["Tempo"],
            "Duration_ms": rowSY["Duration_ms"],
            "Stream": rowSY["Stream"],
            "Url_youtube": rowSY["Url_youtube"],
            "Title": rowSY["Title"],
            "Channel": rowSY["Channel"],
            "Views": rowSY["Views"],
            "Likes": rowSY["Likes"],
            "Comments": rowSY["Comments"],
            "Description": rowSY["Description"],
            "Licensed": rowSY["Licensed"],
            "official_video": rowSY["official_video"],
            # other file part
            "track_album_release_date": rowTF["track_album_release_date"]
            if rowTF is not None
            else "",
            "track_album_id": rowTF["track_album_id"] if rowTF is not None else "",
            "playlist_name": rowTF["playlist_name"] if rowTF is not None else "",
            "playlist_id": rowTF["playlist_id"] if rowTF is not None else "",
            "playlist_genre": rowTF["playlist_genre"] if rowTF is not None else "",
            "playlist_subgenre": rowTF["playlist_subgenre"]
            if rowTF is not None
            else "",
            # generated channel id
            "channelId": channelId,
            # generate album id
            "albumId": albumId,
            # updated stream values
            "uStream": songsDict[rowSY["Uri"]] if rowSY["Uri"] in songsDict else "",
            # updated comments values
            "uComments": videosDict[rowSY["Url_youtube"]][0]
            if rowSY["Url_youtube"] in videosDict
            else "",
            # updated likes values
            "uLikes": videosDict[rowSY["Url_youtube"]][1]
            if rowSY["Url_youtube"] in videosDict
            else "",
            # updated views values
            "uViews": videosDict[rowSY["Url_youtube"]][2]
            if rowSY["Url_youtube"] in videosDict
            else "",
            # updated official_video values
            "uOfficial_video": videosDict[rowSY["Url_youtube"]][3]
            if rowSY["Url_youtube"] in videosDict
            else ""
        }
    )

<h4>Fix the Spotify-Youtube dataset</h4>

In [5]:
readerSY = csv.DictReader(fileSY)
for rowSY in readerSY:
    updateLine(rowSY)

<h4>Merge the two datasets</h4>

In [6]:
# Read the csv files
fileSY.seek(0)
readerSY = csv.DictReader(fileSY)
readerTF = csv.DictReader(fileTF)

# Set fields names
fieldNamesCD = [
    # here start all the fields from the spotify_youtube_with_id_sorted.csv
    "Track",
    "Artist",
    "Url_spotify",
    "Album",
    "Album_type",
    "Uri",
    "Danceability",
    "Energy",
    "Key",
    "Loudness",
    "Speechiness",
    "Acousticness",
    "Instrumentalness",
    "Liveness",
    "Valence",
    "Tempo",
    "Duration_ms",
    "Stream",
    "Url_youtube",
    "Title",
    "Channel",
    "Views",
    "Likes",
    "Comments",
    "Description",
    "Licensed",
    "official_video",
    # here start the chosen fields from the tracks_features_sorted.csv
    "track_album_release_date",
    "track_album_id",
    "playlist_name",
    "playlist_id",
    "playlist_genre",
    "playlist_subgenre",
    # additional channel id
    "channelId",
    # additional album id,
    "albumId",
    "uStream",
    "uComments",
    "uLikes",
    "uViews",
    "uOfficial_video",
]
writerCD = csv.DictWriter(fileCD, fieldnames=fieldNamesCD)

# Write the header row to the output CSV file
writerCD.writeheader()

# Save Spotify_Song data in dict (is faster this way)
TFDict = {}
for rowTF in readerTF:
    TFDict[rowTF["track_id"]] = rowTF

# Fix and write each line
for rowSY in readerSY:
    # Take only spotify id (es.: rowSY["Uri"] = "spotify:track:0d28khcov6AiegSCpG5TuT")
    trackIdSY = rowSY["Uri"].split(":")[2]
    
    # Write composed line
    if trackIdSY in TFDict:
        writeLine(writerCD, rowSY, TFDict[trackIdSY])
    else:
        writeLine(writerCD, rowSY, None)

<h4>Close files</h4>

In [8]:
fileSY.close()
fileTF.close()
fileCD.close()