In [11]:
import findspark
import pdb
findspark.init()
import pyspark
from functools import reduce
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.functions import desc, countDistinct, split, col, lit, avg, asc, sum, row_number, round
spark = SparkSession.builder.appName("Homework").getOrCreate()

In [2]:
from os import listdir
from os.path import isfile, join
mypath = "input/"
onlyfiles = [f for f in listdir(mypath) if isfile(join(mypath, f))]

In [3]:
dfL = [] #list of dataframes
for file in onlyfiles:
    df = spark.read.text(mypath+file)
    
    # splitting the string column into separate columns and cast the type
    split_col = split(df['value'], ' ')
    df = df.select(split_col.getItem(0).alias('month'),split_col.getItem(1).alias('revenue').cast("integer"))
    df = df.withColumn("store",lit(file[:-4]))
    
    if "_" in file:
        df = df.withColumn("city",lit(file[:-6]))
    else:
        df = df.withColumn("city",lit(file[:-4]))
    
    dfL.append(df)
    
dfAll = reduce(DataFrame.unionAll, dfL)

In [5]:
dfAll.show()

                                                                                

+-----+-------+------------+----------+
|month|revenue|       store|      city|
+-----+-------+------------+----------+
|  JAN|     21|      troyes|    troyes|
|  FEB|     21|      troyes|    troyes|
|  MAR|     11|      troyes|    troyes|
|  APR|     17|      troyes|    troyes|
|  MAY|     15|      troyes|    troyes|
|  JUN|     25|      troyes|    troyes|
|  JUL|     11|      troyes|    troyes|
|  AUG|     22|      troyes|    troyes|
|  SEP|     21|      troyes|    troyes|
|  OCT|     28|      troyes|    troyes|
|  NOV|     11|      troyes|    troyes|
|  DEC|     11|      troyes|    troyes|
|  JAN|     11|marseilles_2|marseilles|
|  FEB|     11|marseilles_2|marseilles|
|  MAR|     11|marseilles_2|marseilles|
|  APR|     17|marseilles_2|marseilles|
|  MAY|     12|marseilles_2|marseilles|
|  JUN|     25|marseilles_2|marseilles|
|  JUL|     21|marseilles_2|marseilles|
|  AUG|     22|marseilles_2|marseilles|
+-----+-------+------------+----------+
only showing top 20 rows



- Average monthly income of the shop in France

In [6]:
dfAll.groupBy('month').agg(round(avg("revenue"),3).alias("average_revenue")).orderBy(desc("average_revenue")).show()



+-----+---------------+
|month|average_revenue|
+-----+---------------+
|  DEC|           29.0|
|  JUN|         27.846|
|  OCT|         26.538|
|  SEP|         25.538|
|  NOV|         24.538|
|  AUG|         23.077|
|  MAY|         22.462|
|  JUL|         21.692|
|  JAN|         20.769|
|  APR|         20.231|
|  FEB|         19.154|
|  MAR|         17.538|
+-----+---------------+



                                                                                

- Average monthly income of the shop in each city

In [7]:
dfAll.groupBy('city','month').agg(round(avg("revenue"),3).alias("average_revenue")).orderBy(desc("average_revenue")).show(30)

+----------+-----+---------------+
|      city|month|average_revenue|
+----------+-----+---------------+
|     paris|  OCT|         56.667|
|     paris|  JUN|           55.0|
|     paris|  DEC|         52.667|
|     paris|  MAY|           50.0|
|     paris|  NOV|         48.667|
|     paris|  SEP|           48.0|
|     paris|  AUG|         41.667|
|     paris|  APR|         38.667|
|     paris|  JAN|         38.333|
|     paris|  JUL|         33.667|
|     paris|  FEB|           33.0|
|      nice|  DEC|           29.0|
|marseilles|  OCT|           28.0|
|    troyes|  OCT|           28.0|
|    nantes|  JUN|           28.0|
|     paris|  MAR|         26.333|
|    orlean|  DEC|           26.0|
|marseilles|  DEC|           26.0|
|    troyes|  JUN|           25.0|
|marseilles|  JUN|           25.0|
|    orlean|  AUG|           25.0|
|      lyon|  AUG|           25.0|
|marseilles|  NOV|           24.0|
|    orlean|  NOV|           24.0|
|    nantes|  DEC|           24.0|
|    rennes|  SEP|  

- Total revenue per city per year

In [8]:
dfAll.groupBy('city').agg(sum("revenue").alias("total_revenue")).orderBy(asc("city")).show()

+----------+-------------+
|      city|total_revenue|
+----------+-------------+
|     anger|          166|
|      lyon|          193|
|marseilles|          515|
|    nantes|          207|
|      nice|          203|
|    orlean|          196|
|     paris|         1568|
|    rennes|          180|
|  toulouse|          177|
|    troyes|          214|
+----------+-------------+



- Total revenue per store per year

In [9]:
dfAll.groupBy('store').agg(sum("revenue").alias("total_revenue")).orderBy(asc("store")).show()

+------------+-------------+
|       store|total_revenue|
+------------+-------------+
|       anger|          166|
|        lyon|          193|
|marseilles_1|          284|
|marseilles_2|          231|
|      nantes|          207|
|        nice|          203|
|      orlean|          196|
|     paris_1|          596|
|     paris_2|          642|
|     paris_3|          330|
|      rennes|          180|
|    toulouse|          177|
|      troyes|          214|
+------------+-------------+



- The store that achieves the best performance in each month

In [10]:
w = Window.partitionBy("month").orderBy(col("revenue").desc())
dfAll = dfAll.withColumn("row_number", row_number().over(w))
dfAll = dfAll.filter(col("row_number") == 1)
dfAll.drop("row_number").orderBy(desc("revenue")).show()

+-----+-------+-------+-----+
|month|revenue|  store| city|
+-----+-------+-------+-----+
|  JUN|     85|paris_2|paris|
|  MAY|     72|paris_2|paris|
|  DEC|     71|paris_1|paris|
|  OCT|     68|paris_1|paris|
|  NOV|     64|paris_2|paris|
|  SEP|     63|paris_2|paris|
|  JUL|     61|paris_1|paris|
|  APR|     57|paris_1|paris|
|  JAN|     51|paris_1|paris|
|  AUG|     45|paris_2|paris|
|  MAR|     44|paris_2|paris|
|  FEB|     42|paris_2|paris|
+-----+-------+-------+-----+

