In [2]:
# pyspark initialization
# create sparkcontext
# create sparksession
# csv file import, read it with some options (header, comma seperation etc)

import pyspark
import findspark
findspark.init()
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SQLContext 
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("SparkSqlProject.com") \
    .getOrCreate()
df = df = spark.read.option("header","true").option("sep", ",").option("multiLine", "true").option("quote","\"").option("escape","\"").option("ignoreTrailingWhiteSpace", True).csv("/Users/evangelosgeraridis/Desktop/ddcdm/decentralized/Monthly_Recycling_and_Waste_Collection_Statistics.csv")
df.show(30)
df.printSchema() # view of our given dataset

# PREPROCESING


from pyspark.sql.functions import *
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show() # check if there are nan values in our dataframe

# we want date to be datetype, so we tranform it. The reason is that we want to isolate the years from the whole datatime.(We want seperate collumns of year months etc)


# The collumn named TOTAL  (IN TONS) contains entries that are strings. We want them to become integers (we don't have decimal points) , so that we can sum them 
# eg when we will need to sum grouped by a collumn as TYPE




from pyspark.sql.types import IntegerType
df = df.withColumn("TOTAL (IN TONS)", df["TOTAL (IN TONS)"].cast(IntegerType()))
from pyspark.sql.types import DateType
from pyspark.sql import functions as F

df = df.withColumn(
            'DATE',
                F.to_date(
                    F.unix_timestamp('DATE', 'MM/dd/yyyy').cast('timestamp')))
df.printSchema() #check

#FIRST QUERY

averagedataframe = df.groupBy("TYPE").agg({"TOTAL (IN TONS)":"avg"} ) # we want to find the average in tons of each recycled material through the years (counting from 2011 to 2022)
averagedataframe = averagedataframe.withColumnRenamed('avg(TOTAL (IN TONS))', "AVERAGE OF RECYCLED TYPES")  # changing the collumns name to avoid confusions
# we created a new dataframe that contains this info not neccesary 
averagedataframe.show()


# FIRST QUERY DONE


# SECOND QUERY


# Rename TOTAL (IN TONS) to TOTAL
df = df.withColumnRenamed('TOTAL (IN TONS)', "TOTAL")

# data is the df to be used in the second query
data = df.withColumn('YEAR', year(col('DATE'))) # since DATE contains datetype entries, in the original df, we create a new column 
                                                        # named year that contains the year of each date entry  
                                                        # since some of the next queries are about years

data = data.groupBy("TYPE" , "YEAR").agg({"TOTAL":"sum"}) # dataset named data now contains the total recycled material quantity in tons per year 
                                                        # time for the actual query


# we change the sum(TOTAL) name to just SUM..
#  SUM represents the total quantity of recycled material in tons of each material per year
data = data.withColumnRenamed('sum(TOTAL)', "SUM")

data.show()
# for our query we create a new df named df2 which contains 6 materials of our choice
df2 = data.filter((data.TYPE == "Curb Garbage") | (data.TYPE == "Scrap Metal") | (data.TYPE == "Misc. Recycling") | (data.TYPE == "Asphalt Debris") | (data.TYPE == "Haz Waste") | (data.TYPE == "E-Waste"))
df2.show()
# # df2 contains 6 materials of our choice
# using window partition by (YEAR since we want material recycled quantites per year), we will rank them 
# mterial with rank 1 will the most recycled one (descending order) from all those six 
# materials and then by querying we will see where the rank 1 coresponds to the material we chose

# since scrap metal is never the maximum from those 6 materials chosen, we will pick curb garbage

# here we use  window partition by YEAFR in the dataframe data ( based on our original df df, with the difference that we have isolated the years from date 
# ,so we have each materials recycled quantity per year. Using partition by, we see the 6 elements (those are the chosen ones )
# with descending order (maximum value to minimum) (regarding the each materials recycled quantity ) per year(partitionBy("YEAR"))

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
df2win = Window.partitionBy("YEAR").orderBy(col("SUM").desc())
df2=df2.withColumn("row",row_number().over(df2win)) # βάζουμε νέα στήλη την row (που έχει το rank )

df2.show(40) # we show the first 40 entries 
# we notice that in some materials the recycled quantity has not been recorded (not 0 or nan , just a missing entry ) (we can see this from the dimensions of the df  , so it is possible that in some year 
# the rank collumn  to have a 4 or 5 entry meaning the minimum recycled quantity of that material each year and not 6 which would be the min

df2 = df2.filter((col("row") == 1) & (col("TYPE") == 'Curb Garbage'))
df2 = df2.drop('row')
df2.orderBy('YEAR').show()
df2.count()

# filter because we care about the maximum (col("row") == 1 ,  rank = 1 and at the same time we want the TYPE collumns entries to be 
# curb garbage, in other words curb garbage as the material and where it is the maximum from the other 5 chosen
# orderBy year just for showing porpuse (not needed) , and dropping  the rank collumn for the same reason 
# count , counts how many times the material curb garabage has the most recycled quantity per year from the other 5 chosen materials 
# df2 answers our query
# if we don't want to see the recycled quantity( the sum (we added all the recycled quantities from our material in each datetime so we can have the sum per year
# we just drop the collumn sum but we left it 
# We see that curb garbage had the most recycled ammount from the other 5 chosen materials for 12 consecutive years 
#in order to check, if we do the same with any other material like the scrap metal that was chosen, we will see that it is never the maximum for all those 12 years, as expected


# SECOND QUERY DONE



# THIRD QUERY

# simmilarly ,  but for  all the materials together we will filter only ranks <= 5 
# (5 materials with the most recycled quantities per year)

df3win = Window.partitionBy("YEAR").orderBy(col("SUM").desc())
df3=data.withColumn("row",row_number().over(df3win)) # data is the df we want ( all the materials and their total recycled quantity of the materials per year (SUM))

df3 = df3.filter(col("row") <= 5) # filter
df3.orderBy('YEAR').show(60) # order by year just like in query 2, for showing purposes
# # top 5 materials regarding the reecycled quantity per yearand since we have data from 2011 to 2022 ( 12 years times 5 materials) , we have 
# 60 entries to show at most ( since as we expalined there are some missing records on the dataset


# THIRD QUERY DONE


# FOURTH QUERY

# a good way to face this query would be using window just like before , but here we would need partition by TYPE since the query here states which year did each material have  
# its max or min . The problem is that some entries are missing as stated before, so in order to not exploit the domain knowledge, we will go with another way.


# First and foremost the data dataframe contains the information of our original df with each year isolated in a new column  
# based on this dataframe we will run a partition by TYPE  ( can be done with a single groupBy TYPE since we want the min and max values of each material,
# but by using groupBy a column ,won't show as the other column asked here which is the year column  


df4win = Window.partitionBy("TYPE").orderBy(col("TYPE").desc()) #orderBy TYPE because we want for each material
# we create the new df4_min which will have only the min values of each material for every single year and we place it in a new column named min (withColumn).
# sum is known as of before (sum of each material per year
df4_min = data.withColumn( 'MIN' , min('SUM').over(df4win))
df4_min = df4_min.filter(df4_min.MIN == df4_min.SUM) # filter since we want to only see the entries where each materials recycled quantity is equal 
# with the minimum and in order for us not to have it duplicated , we drop the sum 
# we leave the minimum quantity to show (not needed) since we only want the year where each material had the least recycled quantity compared to all the other materials.
df4_min = df4_min.drop('SUM')
df4_min.show(20)
# after showing the new df we can see that for example Sidewalk Debris had its minimum recycled quantity in χρονιά 2011 with 3900 tons of recycled material.
# and Haz waste has a lot of minimum recycled quantities through the years since it had  0 recycle ammount for more than one years
#  df4_max now similarly will be used for the years that each material had its maximum recycled ammount

df4_max = data.withColumn( 'MAX' , max('SUM').over(df4win))
df4_max = df4_max.filter(df4_max.MAX == df4_max.SUM)
df4_max = df4_max.drop('SUM')
df4_max.show(50)
# here for example Haz Wastehad its maximum recycled ammount in  2014  with 37 tons.

# we could join these two dataframes (max andmin) bdepending on the TYPE collumn but there could be a difference in the number of the entries  ( since one material can have its minimum recycled ammount for a lot of years  
# or its maximum value 
# FOURTH QUERY DONE




# FIFTH QUERY

# Since we need months we will use the original df and not the data dataframe since it does not have a month collumn.

df5 = df.groupBy('MONTH').agg({"TOTAL":"sum"})
df5 = df5.orderBy(col("sum(TOTAL)").desc())
df5 =  df5.withColumnRenamed('sum(TOTAL)', "TOTAL OF RECYCLED MATERIALS PER MONTHS REGARDLESS OF THE YEAR")
df5.show(5) #we have ordered them, so we pick 5 as stated
# regardless of the type and year , so we see for instance that month December (all the Decembers combined from 2011 to 2022) has  
# the maximum total of all the recycled materials combined with 
#  215804 tons,  May come next with 177436 tons  .... and fifth comes September with 166864 tons 


# FIFTH QUERY DONE 



# SIXTH QUERY

#Simmilar to fifth but per year , for instance the year 2020 had a total of x out of all the recycled materials ammounts combined 
df6 = data.groupBy('YEAR').agg({"SUM":"sum"})
df6 =  df6.withColumnRenamed('sum(SUM)', "TOTAL OF RECYCLED MATERIALS PER YEAR") # just a name change for better understanding
df6 = df6.orderBy(col("YEAR").desc()) # year desc order for showing purposes
df6.show(12) # since 2011 to 2022 is 12 years

# SIXTH QUERY DONE 

# Alternative statement of this query

#Since the way this query is stated someone could misunderstand it, in case we wanted to show the total recycled quantity of each material 
# per each year. This information is already saved in the  dataframe data. We had found the year and the total sum of each recycled material
# for every single year since we had isolated first and foremost the  YEAR col. We had placed in the data dataframe the same dataframe data grouped by TYPE
# and  YEAR. So now with this considered if we just show (data.show(144)) (at most 144 entries since 12 materials times 12 years of records in each material ):

df77 = data
df77 = df77.withColumnRenamed('SUM' , 'TOTAL OF RECYCLED QUANTITY OF EACH MATERIAL PER YEAR')
df77 = df77.orderBy('YEAR')
#df77 = df77.orderBy('YEAR')
df77.show(144)



# SEVENTH QUERY

# we achieve a simmilar thing here, but instead of groupBy type and year we will groupby only the TYPE col our original df and since we want the aggregation sum
# per TYPE regardless of the year or date this materials were recycled
# and then an unnecessary orderby just to show them from the maximum to the minimum for showing purposes


df7 = df.groupBy("TYPE").agg({"TOTAL":"sum"})
df7 = df7.withColumnRenamed('sum(TOTAL)', "TOTAL OF RECYCLED TONS OF EACH MATERIAL")
df7 = df7.orderBy(col("TOTAL OF RECYCLED TONS OF EACH MATERIAL").desc())
df7.show(12)

#12 materials so we have just 12 entries

# Concluding, Curb Garbage had the maximum recycled ammount through these 12 years with 1074974 tons (total from 12 years of records)
# and the 12th and last regarding each recycled ammount through these years is Haz Waste with just 130 tons of recycled material 

# SEVENTH QUERY DONE
# END OF CODE




                                                                                

+----------+--------+---------------+---------------+
|      DATE|   MONTH|           TYPE|TOTAL (IN TONS)|
+----------+--------+---------------+---------------+
|01/31/2011| January| Curb Recycling|            699|
|01/31/2011| January|Misc. Recycling|              0|
|01/31/2011| January|    Bottle Bill|            307|
|01/31/2011| January|    Scrap Metal|              0|
|01/31/2011| January| Recycled Tires|             14|
|01/31/2011| January|     Yard Waste|            269|
|01/31/2011| January| Asphalt Debris|              0|
|01/31/2011| January|Sidewalk Debris|              0|
|01/31/2011| January|      Haz Waste|              0|
|01/31/2011| January|        E-Waste|              0|
|01/31/2011| January|   Curb Garbage|           6816|
|01/31/2011| January|  Misc. Garbage|            287|
|02/28/2011|February| Curb Recycling|            606|
|02/28/2011|February|Misc. Recycling|              0|
|02/28/2011|February|    Bottle Bill|            307|
|02/28/2011|February|    Scr

                                                                                

+---------------+-------------------------+
|           TYPE|AVERAGE OF RECYCLED TYPES|
+---------------+-------------------------+
| Curb Recycling|       1271.1791044776119|
|     Yard Waste|        696.7089552238806|
| Asphalt Debris|        4333.333333333333|
|Misc. Recycling|        309.1641791044776|
| Recycled Tires|       24.044776119402986|
|      Haz Waste|       0.9701492537313433|
|Sidewalk Debris|        393.9166666666667|
|    Bottle Bill|        316.1044776119403|
|        E-Waste|        28.29850746268657|
|    Scrap Metal|       284.85820895522386|
|   Curb Garbage|        8022.194029850746|
|  Misc. Garbage|       1342.4925373134329|
+---------------+-------------------------+



                                                                                

+---------------+----+------+
|           TYPE|YEAR|   SUM|
+---------------+----+------+
|Sidewalk Debris|2014|  5004|
|        E-Waste|2014|   469|
|   Curb Garbage|2013|113856|
|    Scrap Metal|2016|  4056|
|    Scrap Metal|2012|     0|
|        E-Waste|2012|   215|
|        E-Waste|2017|   435|
|        E-Waste|2018|   410|
|   Curb Garbage|2016| 87325|
|    Bottle Bill|2022|   634|
| Recycled Tires|2021|   269|
| Curb Recycling|2011|  8864|
|        E-Waste|2015|   428|
| Recycled Tires|2020|   264|
| Asphalt Debris|2013| 60000|
| Curb Recycling|2015| 14944|
|Misc. Recycling|2019|  5210|
| Curb Recycling|2021| 18229|
|  Misc. Garbage|2012| 20948|
| Recycled Tires|2012|   343|
+---------------+----+------+
only showing top 20 rows



                                                                                

+---------------+----+------+
|           TYPE|YEAR|   SUM|
+---------------+----+------+
|        E-Waste|2014|   469|
|   Curb Garbage|2013|113856|
|    Scrap Metal|2016|  4056|
|    Scrap Metal|2012|     0|
|        E-Waste|2012|   215|
|        E-Waste|2017|   435|
|        E-Waste|2018|   410|
|   Curb Garbage|2016| 87325|
|        E-Waste|2015|   428|
| Asphalt Debris|2013| 60000|
|Misc. Recycling|2019|  5210|
|    Scrap Metal|2014|  4140|
|      Haz Waste|2014|    37|
|        E-Waste|2020|   360|
|Misc. Recycling|2020|  5210|
|    Scrap Metal|2017|  3976|
|      Haz Waste|2011|     0|
|    Scrap Metal|2021|  3984|
|Misc. Recycling|2015|  5158|
|   Curb Garbage|2019| 88479|
+---------------+----+------+
only showing top 20 rows



                                                                                

+---------------+----+------+---+
|           TYPE|YEAR|   SUM|row|
+---------------+----+------+---+
|   Curb Garbage|2011|102404|  1|
| Asphalt Debris|2011| 28000|  2|
|Misc. Recycling|2011|     1|  3|
|      Haz Waste|2011|     0|  4|
|        E-Waste|2011|     0|  5|
|    Scrap Metal|2011|     0|  6|
|   Curb Garbage|2012| 95045|  1|
| Asphalt Debris|2012| 60000|  2|
|        E-Waste|2012|   215|  3|
|Misc. Recycling|2012|    14|  4|
|      Haz Waste|2012|     6|  5|
|    Scrap Metal|2012|     0|  6|
|   Curb Garbage|2013|113856|  1|
| Asphalt Debris|2013| 60000|  2|
|    Scrap Metal|2013|  4057|  3|
|        E-Waste|2013|   412|  4|
|Misc. Recycling|2013|    63|  5|
|      Haz Waste|2013|     7|  6|
|   Curb Garbage|2014|113109|  1|
| Asphalt Debris|2014| 60000|  2|
|Misc. Recycling|2014|  4912|  3|
|    Scrap Metal|2014|  4140|  4|
|        E-Waste|2014|   469|  5|
|      Haz Waste|2014|    37|  6|
|   Curb Garbage|2015| 88936|  1|
|Misc. Recycling|2015|  5158|  2|
|    Scrap Met

                                                                                

+------------+----+------+
|        TYPE|YEAR|   SUM|
+------------+----+------+
|Curb Garbage|2011|102404|
|Curb Garbage|2012| 95045|
|Curb Garbage|2013|113856|
|Curb Garbage|2014|113109|
|Curb Garbage|2015| 88936|
|Curb Garbage|2016| 87325|
|Curb Garbage|2017| 89250|
|Curb Garbage|2018| 89162|
|Curb Garbage|2019| 88479|
|Curb Garbage|2020| 97761|
|Curb Garbage|2021| 97926|
|Curb Garbage|2022| 11721|
+------------+----+------+



                                                                                

+---------------+----+------+---+
|           TYPE|YEAR|   SUM|row|
+---------------+----+------+---+
| Curb Recycling|2011|  8864|  4|
| Asphalt Debris|2011| 28000|  2|
|Sidewalk Debris|2011|  3900|  5|
|  Misc. Garbage|2011| 23736|  3|
|   Curb Garbage|2011|102404|  1|
|   Curb Garbage|2012| 95045|  1|
| Curb Recycling|2012| 13177|  4|
|  Misc. Garbage|2012| 20948|  3|
|Sidewalk Debris|2012|  5000|  5|
| Asphalt Debris|2012| 60000|  2|
|   Curb Garbage|2013|113856|  1|
| Asphalt Debris|2013| 60000|  2|
| Curb Recycling|2013| 14106|  3|
|     Yard Waste|2013|  5523|  4|
|Sidewalk Debris|2013|  5004|  5|
| Curb Recycling|2014| 14669|  3|
|     Yard Waste|2014|  5868|  4|
|Sidewalk Debris|2014|  5004|  5|
|   Curb Garbage|2014|113109|  1|
| Asphalt Debris|2014| 60000|  2|
| Curb Recycling|2015| 14944|  3|
|   Curb Garbage|2015| 88936|  1|
|  Misc. Garbage|2015| 27042|  2|
|     Yard Waste|2015| 11433|  4|
|Misc. Recycling|2015|  5158|  5|
|  Misc. Garbage|2016| 19843|  2|
| Curb Recycli