## Read and merge all files

In [0]:
# Import
import re
from pyspark.sql.functions import lit, udf, lower, initcap, col
from pyspark.sql.types import IntegerType

# CSV options
file_type = "csv"
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The databricks directory where the file are stored
dir = dbutils.fs.ls('/FileStore/tables/')

# Load the files and concatenate them in a sigle dataframe
n = 0
for filename in dir:
    if n == 0:
        print(filename)
        df = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(filename[0])
        df = df.withColumn('Auditeur', lit(filename[1][:-4])) # Ajout d'une colonne avec le nom de l'auditeur (le nom du fichier)
    else:
        print(filename)
        df2 = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(filename[0])
        df2 = df2.withColumn('Auditeur', lit(filename[1][:-4])) # Ajout d'une colonne avec le nom de l'auditeur (le nom du fichier)
        if df2.count() > 0:
            df = df.union(df2).distinct()
        print((df.count(), len(df.columns)))
    n += 1

# Rename the columns
df = df.withColumnRenamed('_c0', 'Artiste')
df = df.withColumnRenamed('_c1', 'Album')
df = df.withColumnRenamed('_c2', 'Morceau')
df = df.withColumnRenamed('_c3', 'Date')

# Fill NaN
df = df.na.fill({'Album': ''})

# Drop NaN dates
df = df.dropna(subset='Date')

def containeNoChinese(a):
    if re.search(u'[\u4e00-\u9fff]', a):
        return 0
    else:
        return 1

# Create a new column 
udf_containeNoChinese = udf(containeNoChinese, IntegerType())
df = df.withColumn("excludeChineseCharacteres",
                   udf_containeNoChinese("Artiste") * udf_containeNoChinese("Album") * udf_containeNoChinese("Morceau") * udf_containeNoChinese("Auditeur"))

# Put ererything to lowercase to avoid duplicated artist name for example
df = df.withColumn('Artiste', lower('Artiste')).withColumn('Album', lower('Album')).withColumn('Morceau', lower('Morceau'))

# Put back the first lowercase
df = df.withColumn("Artiste", initcap(col('Artiste'))).withColumn("Album", initcap(col('Album'))).withColumn("Morceau", initcap(col('Morceau')))

# Display the dataframe
display(df)

FileInfo(path='dbfs:/FileStore/tables/AkaGambit.csv', name='AkaGambit.csv', size=399911, modificationTime=1672952979000)
FileInfo(path='dbfs:/FileStore/tables/DuckDAWorld.csv', name='DuckDAWorld.csv', size=4773408, modificationTime=1672953104000)
(75363, 5)
FileInfo(path='dbfs:/FileStore/tables/Hanhvunt2002.csv', name='Hanhvunt2002.csv', size=15382483, modificationTime=1672953162000)
(223136, 5)
FileInfo(path='dbfs:/FileStore/tables/HeyDottore.csv', name='HeyDottore.csv', size=832173, modificationTime=1672953134000)
(237135, 5)
FileInfo(path='dbfs:/FileStore/tables/JBloom91.csv', name='JBloom91.csv', size=3155204, modificationTime=1672952505000)
(279822, 5)
FileInfo(path='dbfs:/FileStore/tables/Jes_Jungkook97.csv', name='Jes_Jungkook97.csv', size=13162615, modificationTime=1672952576000)
(394901, 5)
FileInfo(path='dbfs:/FileStore/tables/Kleber_Fp17.csv', name='Kleber_Fp17.csv', size=9714874, modificationTime=1672952642000)
(428002, 5)
FileInfo(path='dbfs:/FileStore/tables/Lavark99.csv'

In [0]:
# Remove a table

# dbutils.fs.rm('/user/hive/warehouse/full')

In [0]:
# Display the dataframe
display(df)