In [56]:
from pyspark.sql import SparkSession

In [57]:
spark = SparkSession.builder.appName("testSparkApp").getOrCreate()

In [58]:
artistFilePath= "D:\\Datasets\\famousPainting\\artist.csv"
artistDf = spark.read.csv(artistFilePath, header=True, inferSchema=True)

In [59]:
canvasSizePath = "D:\\Datasets\\famousPainting\\canvas_size.csv"
canvasSizeDf = spark.read.csv(canvasSizePath, header=True, inferSchema=True)
imageLinkPath = "D:\\Datasets\\famousPainting\\image_link.csv"
imageLinkDf = spark.read.csv(imageLinkPath, header=True, inferSchema=True)
museumPath = "D:\\Datasets\\famousPainting\\museum.csv"
museumDf = spark.read.csv(museumPath, header=True, inferSchema=True)
museumHoursPath = "D:\\Datasets\\famousPainting\\museum_hours.csv"
museumHoursDf = spark.read.csv(museumHoursPath, header=True, inferSchema=True)
productSizePath = "D:\\Datasets\\famousPainting\\product_size.csv"
productSizeDf = spark.read.csv(productSizePath, header=True, inferSchema=True)
subjectPath = "D:\\Datasets\\famousPainting\\subject.csv"
subjectDf = spark.read.csv(subjectPath, header=True, inferSchema=True)
workPath = "D:\\Datasets\\famousPainting\\work.csv"
workDf = spark.read.csv(workPath, header=True, inferSchema=True)

In [60]:
workDf.show()

+-------+--------------------+---------+-------+---------+
|work_id|                name|artist_id|  style|museum_id|
+-------+--------------------+---------+-------+---------+
| 160228|Still Life with F...|      615|Baroque|       43|
| 160236|Still Life with F...|      615|Baroque|       43|
| 160244|Still Life with F...|      615|Baroque|       43|
| 160252|Still Life with F...|      615|Baroque|       43|
| 160260|Still Life with F...|      615|Baroque|       43|
| 160268|The Overturned Bo...|      615|Baroque|       43|
| 125752|Arabian Horses at...|      757|Baroque|     NULL|
| 125818|Count Halm on His...|      757|Baroque|     NULL|
|  23448| Horses at the Porch|      757|Baroque|       34|
| 125763|Napoleon Before t...|      757|Baroque|     NULL|
| 125774|Peasants Resting ...|      757|Baroque|     NULL|
| 125785|Portrait Oberleut...|      757|Baroque|     NULL|
| 125796|The Rescue of Cou...|      757|Baroque|     NULL|
| 125807|     The Stable Yard|      757|Baroque|     NUL

In [61]:
#Paintings which are not displayed on any museums
paintingsNotInMuseumDf = workDf.filter(workDf["museum_id"].isNull())

In [62]:
#Museums without any paintings
joinCol = "museum_id"
joinedDf = museumDf.join(workDf, workDf[joinCol] == museumDf[joinCol], "leftanti").show()

+---------+----+-------+----+-----+------+-------+-----+---+
|museum_id|name|address|city|state|postal|country|phone|url|
+---------+----+-------+----+-----+------+-------+-----+---+
+---------+----+-------+----+-----+------+-------+-----+---+



In [45]:
#paintings where sale price is higher than regular price
productSizeDf.filter(productSizeDf["sale_price"]>productSizeDf["regular_price"]).show()

+-------+-------+----------+-------------+
|work_id|size_id|sale_price|regular_price|
+-------+-------+----------+-------------+
+-------+-------+----------+-------------+



In [46]:
#paintings where sale price is half than the regular price
productSizeDf.filter(productSizeDf["sale_price"]>productSizeDf["regular_price"]/2).show()


+-------+-------+----------+-------------+
|work_id|size_id|sale_price|regular_price|
+-------+-------+----------+-------------+
| 160228|     24|        85|           85|
| 160228|     30|        95|           95|
| 160236|     24|        85|           85|
| 160236|     30|        95|           95|
| 160244|     24|        85|           85|
| 160244|     30|        95|           95|
| 160252|     24|        85|           85|
| 160252|     30|        95|           95|
| 160260|     24|        75|           75|
| 160260|     30|        95|           95|
| 160268|     24|        85|           85|
| 160268|     30|        95|           95|
| 125752|     30|        95|           95|
| 125752|     24|        85|           85|
| 125752|   3024|       305|          535|
| 125752|   3226|       325|          575|
| 125752|   3629|       375|          675|
| 125752|   4030|       405|          735|
| 125752|   4836|       495|          915|
| 125752|   6048|       675|         1275|
+-------+--

In [47]:
#Which canva size costs the most
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.orderBy(F.desc("sale_price"))
productSizeRanked = productSizeDf.withColumn("rnk", F.rank().over(windowSpec))
mostExpensiveCanva = productSizeRanked.filter("rnk == 1")\
                        .join(canvasSizeDf, canvasSizeDf["size_id"].cast("string") == productSizeRanked["size_id"], "inner")\
                        .select(canvasSizeDf["label"].alias("canva"), productSizeRanked["sale_price"])
mostExpensiveCanva.show()

+--------------------+----------+
|               canva|sale_price|
+--------------------+----------+
|"48"" x 96""(122 ...|      1115|
+--------------------+----------+



In [48]:
#Delete duplicate records from work, product_size, subject and image_link tables
workDfNoDuplicates = workDf.dropDuplicates(subset=["work_id"])
productSizeDfNoDuplicates = productSizeDf.dropDuplicates(subset=["work_id", "size_id"])
subjectDfNoDuplicates = subjectDf.dropDuplicates(subset=["work_id", "subject"])
imageLinkDfNoDuplicates = imageLinkDf.dropDuplicates(subset=["work_id"])

In [49]:
#Identify museum with invalid city information
from pyspark.sql.functions import col
invalidMuseumCity = museumDf.filter(col("city").rlike("^[0-9]"))
invalidMuseumCity.show()

+---------+--------------------+--------------------+---------------+---------------+------+-----------+----------------+--------------------+
|museum_id|                name|             address|           city|          state|postal|    country|           phone|                 url|
+---------+--------------------+--------------------+---------------+---------------+------+-----------+----------------+--------------------+
|       34|The State Hermita...|       Palace Square|              2|Sankt-Peterburg|190000|     Russia| 7 812 710-90-79|https://www.hermi...|
|       36|     Museum Folkwang|      Museumsplatz 1|          45128|          Essen|  NULL|    Germany|  49 201 8845000|https://www.museu...|
|       37|  Museum of Grenoble|  5 Pl. de Lavalette|          38000|       Grenoble|  NULL|     France|33 4 76 63 44 44|https://www.musee...|
|       38|Musée des Beaux-A...|40 Pl. Saint-Core...|          29000|        Quimper|  NULL|     France|33 2 98 95 45 20|https://www.mbaq....|

In [50]:
#fetch top 10 most famous painting subject
joinedDf = workDf.join(subjectDf, subjectDf["work_id"] == workDf["work_id"])
groupedDf = joinedDf.groupBy("subject").agg(F.count("*").alias("number_of_paintings"))
windowSpec = Window.orderBy(F.desc("number_of_paintings"))
rankedDf = groupedDf.withColumn("rnk", F.rank().over(windowSpec))
top10SubjectDf = rankedDf.filter("rnk<11")
top10SubjectDf.show()

+-------------------+-------------------+---+
|            subject|number_of_paintings|rnk|
+-------------------+-------------------+---+
|          Portraits|               1070|  1|
|Abstract/Modern Art|                575|  2|
|               Nude|                525|  3|
|      Landscape Art|                495|  4|
|       Rivers/Lakes|                480|  5|
|            Flowers|                457|  6|
|         Still-Life|                395|  7|
|          Seascapes|                326|  8|
|Marine Art/Maritime|                268|  9|
|             Horses|                265| 10|
+-------------------+-------------------+---+



In [51]:
#Identify the museums which are open on both Sunday and Monday. Display museum name, city.
joinedDf = museumHoursDf.alias('mh').join(museumDf.alias('m'), F.col('m.museum_id') == F.col('mh.museum_id'))
sundayOpenMuseumDf = joinedDf.filter((F.col('mh.day') == 'Sunday'))
#mondayOpenMuseumDf = joinedDf.filter((F.col('mh.day') == 'Monday'))
mondayOpenMuseumDf = joinedDf.alias("joined").join(museumHoursDf.alias('mh2'), ['museum_id', 'day']).filter((F.col('mh2.day') == 'Monday'))
resultDf = sundayOpenMuseumDf.join(mondayOpenMuseumDf, ['museum_id']).select('m.name', 'm.city', 'm.state', 'm.country').distinct()
print("Number of the museums open on both sunday and monday is : ", resultDf.count())
resultDf.show()

Number of the museums open on both sunday and monday is :  28
+--------------------+------------+--------+--------------+
|                name|        city|   state|       country|
+--------------------+------------+--------+--------------+
| Norton Simon Museum|    Pasadena|      CA|           USA|
|National Gallery ...|   Melbourne|Victoria|     Australia|
|Smithsonian Ameri...|  Washington|      DC|           USA|
|Philadelphia Muse...|Philadelphia|      PA|           USA|
|     Musée du Louvre|       75001|   Paris|        France|
|    The Tate Gallery|      London| England|            UK|
|National Maritime...|      London|SE10 9NF|United Kingdom|
|Los Angeles Count...| Los Angeles|      CA|           USA|
|The Metropolitan ...|    New York|      NY|           USA|
|    National Gallery|      London| England|            UK|
|  Museum of Grenoble|       38000|Grenoble|        France|
|National Gallery ...|  Washington|      DC|           USA|
|The Art Institute...|     Chicago|   

In [52]:
#How many museums are open every day
daysCountMuseumDf = museumHoursDf.groupBy("museum_id").agg(F.countDistinct("day").alias("days_open"))
allDaysOpenMuseumDf = daysCountMuseumDf.filter("days_open = 7")
print("total number of museum open every day : ", allDaysOpenMuseumDf.count())

total number of museum open every day :  17


In [53]:
#Which are the top 5 most popular museum? (Popularity is defined based on most no of paintings in a museum)
joinedDf = workDf.join(museumDf.withColumnRenamed("museum_id", "mid").withColumnRenamed("name", "museum_name"), workDf["museum_id"] == F.col("mid"))
countPerMuseumDf = joinedDf.groupBy("museum_id", "name", "city", "country").agg(F.count("*").alias("number_of_paintings"))
windowSpec = Window.orderBy(F.desc("number_of_paintings"))
rankedDf = countPerMuseumDf.withColumn("rnk", F.rank().over(windowSpec))
top5MuseumsDf = rankedDf.filter("rnk < 6").select("name", "city", "country", "number_of_paintings")
top5MuseumsDf.show()

+--------------------+------------+-----------+-------------------+
|                name|        city|    country|number_of_paintings|
+--------------------+------------+-----------+-------------------+
|   George Washington|    New York|        USA|                  6|
|       Self-Portrait|   Amsterdam|Netherlands|                  6|
| Portrait of a Woman|   Cleveland|        USA|                  5|
| Landscape (Paysage)|Philadelphia|        USA|                  5|
|The Virgin and Child|      London|         UK|                  3|
| Portrait of a Woman|    New York|        USA|                  3|
|Street of L'Hermi...|  Washington|        USA|                  3|
|   Portrait of a Man|    New York|        USA|                  3|
|Portrait of a You...|      London|         UK|                  3|
|       Self-Portrait|       75001|     France|                  3|
+--------------------+------------+-----------+-------------------+



In [54]:
#Who are the top 5 most popular artist? (based on number of paitings) 
joinedDf = workDf.join(artistDf.withColumnRenamed("artist_id", "id"), workDf["artist_id"]==F.col("id"))
countPerArtistDf = joinedDf.groupBy("artist_id").agg(F.count("*").alias("Number_of_paintings"))
countPerArtistDf.sort("Number_of_paintings", ascending=[False]).take(5)

[Row(artist_id=500, Number_of_paintings=469),
 Row(artist_id=550, Number_of_paintings=378),
 Row(artist_id=677, Number_of_paintings=308),
 Row(artist_id=649, Number_of_paintings=253),
 Row(artist_id=559, Number_of_paintings=233)]

In [55]:
#Display the 3 least popular canva sizes
joinedDfIntermediate = workDf.join(productSizeDf.withColumnRenamed("work_id", "ps_work_id"), F.col("work_id") == F.col("ps_work_id")).join(canvasSizeDf.withColumnRenamed("size_id", "cs_size_id"), F.col("size_id") == F.col("cs_size_id"))
countPerSize = joinedDfIntermediate.groupBy("cs_size_id", "label").agg(F.count("*").alias("number_of_paintings"))
rankedDf = countPerSize.withColumn("rnk", F.dense_rank().over(Window.orderBy(F.desc("number_of_paintings"))))
rankedDf.filter("rnk<4").select("label", "rnk", "number_of_paintings").show()


+--------------------+---+-------------------+
|               label|rnk|number_of_paintings|
+--------------------+---+-------------------+
|"36"" x 48""(91 c...|  1|               4647|
|"30"" x 40""(76 c...|  1|               4647|
|"29"" x 36""(74 c...|  2|               4622|
|"48"" x 60""(122 ...|  3|               4550|
+--------------------+---+-------------------+



In [112]:
#Which museum is open for the longest during a day. Display museum name, state and hours open and which day
def format_time_udf(time_string):
    hours, minutes, period = time_string.split(":")
    return f"{hours}:{minutes} {period}"
format_time = F.udf(format_time_udf)
updatedMuseumHoursDf = museumHoursDf.withColumn( "open", format_time("open")).withColumn("open", F.to_timestamp("open", "h:mm a")).withColumn( "close", format_time("close")).withColumn("close", F.to_timestamp("open", "h:mm p"))
windowSpec = Window.orderBy(F.desc("duration"))
resultDf = museumHoursDf.join(museumDf, "museum_id").withColumn("duration", F.col("close").cast("long")-F.col("open").cast("long"))
updatedMuseumHoursDf.show()

+---------+---------+-------------------+-------------------+
|museum_id|      day|               open|              close|
+---------+---------+-------------------+-------------------+
|       30|   Sunday|1970-01-01 10:30:00|1970-01-01 10:30:00|
|       30|   Monday|1970-01-01 10:30:00|1970-01-01 10:30:00|
|       30|  Tuesday|1970-01-01 10:30:00|1970-01-01 10:30:00|
|       30|Wednesday|1970-01-01 10:30:00|1970-01-01 10:30:00|
|       30|  Thusday|1970-01-01 10:30:00|1970-01-01 10:30:00|
|       30|   Friday|1970-01-01 10:30:00|1970-01-01 10:30:00|
|       30| Saturday|1970-01-01 10:30:00|1970-01-01 10:30:00|
|       31|   Sunday|1970-01-01 11:00:00|1970-01-01 11:00:00|
|       31|   Monday|1970-01-01 11:00:00|1970-01-01 11:00:00|
|       31|  Tuesday|1970-01-01 11:00:00|1970-01-01 11:00:00|
|       31|Wednesday|1970-01-01 11:00:00|1970-01-01 11:00:00|
|       31|  Thusday|1970-01-01 11:00:00|1970-01-01 11:00:00|
|       31|   Friday|1970-01-01 11:00:00|1970-01-01 11:00:00|
|       