In [1]:
from pyspark.sql import SparkSession
import os
import configparser
import  pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [19]:
df = spark.read.csv("Co2.csv",sep=",", inferSchema=True, header=True)

In [20]:
df.printSchema()
df.show(5)

root
 |-- Day: integer (nullable = true)
 |-- Samples: integer (nullable = true)
 |-- Mean: double (nullable = true)
 |-- Median: double (nullable = true)
 |-- Stdev: double (nullable = true)

+--------+-------+------------------+------------------+------------------+
|     Day|Samples|              Mean|            Median|             Stdev|
+--------+-------+------------------+------------------+------------------+
|20020901|     41| 372.8049011230469| 372.7300109863281|3.2957961559295654|
|20020901|     56| 370.8941345214844|370.68951416015625|2.3466877937316895|
|20020902|     39| 373.6762390136719| 373.3349914550781| 2.734053611755371|
|20020903|     30|373.72796630859375|373.25299072265625| 2.933104991912842|
|20020903|     43| 371.3970031738281| 371.3280029296875|3.5585501194000244|
+--------+-------+------------------+------------------+------------------+
only showing top 5 rows



In [21]:
dfDate = df.withColumn("NewDay", F.to_timestamp("Day"))
dfDate.printSchema()
dfDate.show(5)

root
 |-- Day: integer (nullable = true)
 |-- Samples: integer (nullable = true)
 |-- Mean: double (nullable = true)
 |-- Median: double (nullable = true)
 |-- Stdev: double (nullable = true)
 |-- NewDay: timestamp (nullable = true)

+--------+-------+------------------+------------------+------------------+-------------------+
|     Day|Samples|              Mean|            Median|             Stdev|             NewDay|
+--------+-------+------------------+------------------+------------------+-------------------+
|20020901|     41| 372.8049011230469| 372.7300109863281|3.2957961559295654|1970-08-20 17:21:41|
|20020901|     56| 370.8941345214844|370.68951416015625|2.3466877937316895|1970-08-20 17:21:41|
|20020902|     39| 373.6762390136719| 373.3349914550781| 2.734053611755371|1970-08-20 17:21:42|
|20020903|     30|373.72796630859375|373.25299072265625| 2.933104991912842|1970-08-20 17:21:43|
|20020903|     43| 371.3970031738281| 371.3280029296875|3.5585501194000244|1970-08-20 17:21:43

In [None]:
dfDate = df.withColumn("month", F.month("payment_date"))

In [3]:
from pyspark.sql.types import StructType as R, StructField as Fld, StringType as Str, DoubleType as Dbl,  IntegerType as Int, DateType as Date
Co2Schema = R([
    Fld("Day",Date()),
    Fld("Samples",Int()),
    Fld("Mean",Dbl()),
    Fld("Median",Dbl()),
    Fld("Stdev",Dbl()),
])

In [4]:
dfCo2WithSchema = spark.read.csv("Co2.csv", sep=",", schema=Co2Schema, header=True)

In [5]:
dfCo2WithSchema.printSchema()
dfCo2WithSchema.show(5)

root
 |-- Day: date (nullable = true)
 |-- Samples: integer (nullable = true)
 |-- Mean: double (nullable = true)
 |-- Median: double (nullable = true)
 |-- Stdev: double (nullable = true)

+----------+-------+------------------+------------------+------------------+
|       Day|Samples|              Mean|            Median|             Stdev|
+----------+-------+------------------+------------------+------------------+
|2002-09-01|     41| 372.8049011230469| 372.7300109863281|3.2957961559295654|
|2002-09-01|     56| 370.8941345214844|370.68951416015625|2.3466877937316895|
|2002-09-02|     39| 373.6762390136719| 373.3349914550781| 2.734053611755371|
|2002-09-03|     30|373.72796630859375|373.25299072265625| 2.933104991912842|
|2002-09-03|     43| 371.3970031738281| 371.3280029296875|3.5585501194000244|
+----------+-------+------------------+------------------+------------------+
only showing top 5 rows



In [22]:
df = dfCo2WithSchema.groupby('Day').agg(F.mean('Mean').alias('newMean'))
df.show(5)

+----------+------------------+
|       Day|           newMean|
+----------+------------------+
|2002-12-06| 371.7021026611328|
|2002-12-25|   373.50634765625|
|2005-01-16|378.61126708984375|
|2005-06-06| 379.7440185546875|
|2006-05-17|379.89076741536456|
+----------+------------------+
only showing top 5 rows



In [55]:
dfCo2WithSchema.createOrReplaceTempView("Co2")
spark.sql("""
    SELECT year(Day) as year, month(Day) as month, Avg(Mean) as Mean
    FROM Co2
    GROUP by year, month
""").show()

+----+-----+------------------+
|year|month|              Mean|
+----+-----+------------------+
|2005|    5|378.88365695901115|
|2004|    6| 376.0786543048796|
|2005|   10|379.09751073864925|
|2003|    2|374.76837800678453|
|2004|    8| 376.0771308898926|
|2002|   12| 373.1512968275282|
|2006|   12| 380.9201279628424|
|2007|    1|381.72070187085296|
|2003|   10|375.96485900878906|
|2004|   10| 376.8089548746745|
|2006|    7|380.28902882543105|
|2005|    6| 378.8884950402665|
|2003|    3| 374.7901023415958|
|2002|   11| 372.5125354003906|
|2006|    6|380.35894947320645|
|2003|    9| 375.7058285444211|
|2003|   12| 374.9952166521991|
|2003|    1|  373.959287076383|
|2005|   11| 379.0090730794271|
|2004|    7| 376.2038090693486|
+----+-----+------------------+
only showing top 20 rows



In [21]:
output_data = "parquet"
# extract columns
Co2_table = df
# write table to parquet files
Co2_table.write \
.mode("overwrite") \
.format("parquet") \
.save("Co2.parquet")

In [27]:
# read in song data to use for Co2 table
Co2_df = spark.read.parquet("Co2.parquet")
Co2_df.count()

1568

In [26]:
# Write code here
Co2_df.createOrReplaceTempView("Co2")
spark.sql("""
    SELECT *
    FROM Co2
    GROUP Order by Day ASC
""").show()

+----------+------------------+
|       Day|           newMean|
+----------+------------------+
|2002-09-01| 371.8495178222656|
|2002-09-02| 373.6762390136719|
|2002-09-03|372.56248474121094|
|2002-09-04|372.17618560791016|
|2002-09-05| 372.4253387451172|
|2002-09-06|371.29062906901044|
|2002-09-07| 371.8840637207031|
|2002-09-08| 371.2864227294922|
|2002-09-09|   371.52001953125|
|2002-09-10| 370.8514404296875|
|2002-09-11|373.46527099609375|
|2002-09-13|371.64884185791016|
|2002-09-14|   372.70751953125|
|2002-09-15| 371.2467447916667|
|2002-09-16| 372.9146423339844|
|2002-09-17| 372.0543518066406|
|2002-09-18|372.36448669433594|
|2002-09-19|371.03106689453125|
|2002-09-20|372.19069417317706|
|2002-09-21|371.51136779785156|
+----------+------------------+
only showing top 20 rows

