# Exercise 3 - Data Lake on S3

In [1]:
from pyspark.sql import SparkSession
import os
import configparser

# Make sure that your AWS credentials are loaded as env vars

In [13]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open("C:\\Users\\arunk01\\Desktop\\Udacity\\Datalake\dwh.cfg"))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [14]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [16]:
df = spark.read.csv("s3a://billingarun/Billing.csv")


In [17]:
df.printSchema()
df.show(5)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)

+---------+--------------+--------------------+--------+------------+------------+------------+-------+
|      _c0|           _c1|                 _c2|     _c3|         _c4|         _c5|         _c6|    _c7|
+---------+--------------+--------------------+--------+------------+------------+------------+-------+
|Zone Name| Main Category|        Sub Category|    Year|        Sale|      Demand|  Collection|EFFI(%)|
|PUNE ZONE| HT-INDUSTRIAL|    HT-IND.(EXPRESS)|FY 12-13| 1,253.5944 | 1,020.3831 |   967.3706 |105.48%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY 12-13|        -   |        -   |     0.0692 |  0.00%|
|PUNE ZONE| HT-INDUSTRIAL|HT-IND.(NON-EXPRESS)|FY 12-13| 1,508.0498 | 1,131.4004 | 1,101.6

In [18]:
df = spark.read.csv("s3a://billingarun/Billing.csv",sep=",", inferSchema=True, header=True)

In [19]:
df.printSchema()
df.show(5)

root
 |-- Zone Name: string (nullable = true)
 |-- Main Category: string (nullable = true)
 |-- Sub Category: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Sale: string (nullable = true)
 |-- Demand: string (nullable = true)
 |-- Collection: string (nullable = true)
 |-- EFFI(%): string (nullable = true)

+---------+--------------+--------------------+--------+------------+------------+------------+-------+
|Zone Name| Main Category|        Sub Category|    Year|        Sale|      Demand|  Collection|EFFI(%)|
+---------+--------------+--------------------+--------+------------+------------+------------+-------+
|PUNE ZONE| HT-INDUSTRIAL|    HT-IND.(EXPRESS)|FY 12-13| 1,253.5944 | 1,020.3831 |   967.3706 |105.48%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY 12-13|        -   |        -   |     0.0692 |  0.00%|
|PUNE ZONE| HT-INDUSTRIAL|HT-IND.(NON-EXPRESS)|FY 12-13| 1,508.0498 | 1,131.4004 | 1,101.6759 |102.70%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY

In [21]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", inferSchema=True, header=True)

In [22]:
df.printSchema()
df.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: string (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



In [23]:
import  pyspark.sql.functions as F
dfPayment = df.withColumn("payment_date", F.to_timestamp("payment_date"))
dfPayment.printSchema()
dfPayment.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 15:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 09:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 15:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-28 18:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 02:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



In [24]:
df = spark.read.csv("s3a://billingarun/Billing.csv",sep=",", inferSchema=True, header=True)

In [25]:
df.printSchema()
df.show(5)

root
 |-- Zone Name: string (nullable = true)
 |-- Main Category: string (nullable = true)
 |-- Sub Category: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Sale: string (nullable = true)
 |-- Demand: string (nullable = true)
 |-- Collection: string (nullable = true)
 |-- EFFI(%): string (nullable = true)

+---------+--------------+--------------------+--------+------------+------------+------------+-------+
|Zone Name| Main Category|        Sub Category|    Year|        Sale|      Demand|  Collection|EFFI(%)|
+---------+--------------+--------------------+--------+------------+------------+------------+-------+
|PUNE ZONE| HT-INDUSTRIAL|    HT-IND.(EXPRESS)|FY 12-13| 1,253.5944 | 1,020.3831 |   967.3706 |105.48%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY 12-13|        -   |        -   |     0.0692 |  0.00%|
|PUNE ZONE| HT-INDUSTRIAL|HT-IND.(NON-EXPRESS)|FY 12-13| 1,508.0498 | 1,131.4004 | 1,101.6759 |102.70%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY

# Change String to Float and Integers

In [39]:
import  pyspark.sql.functions as F
dfPayment = df.withColumn("Demand", df["Sale"].cast("float"))
dfPayment.printSchema()
dfPayment.show(10)

root
 |-- Zone Name: string (nullable = true)
 |-- Main Category: string (nullable = true)
 |-- Sub Category: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Sale: string (nullable = true)
 |-- Demand: float (nullable = true)
 |-- Collection: string (nullable = true)
 |-- EFFI(%): string (nullable = true)

+---------+--------------+--------------------+--------+------------+--------+------------+-------+
|Zone Name| Main Category|        Sub Category|    Year|        Sale|  Demand|  Collection|EFFI(%)|
+---------+--------------+--------------------+--------+------------+--------+------------+-------+
|PUNE ZONE| HT-INDUSTRIAL|    HT-IND.(EXPRESS)|FY 12-13| 1,253.5944 |    null|   967.3706 |105.48%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY 12-13|        -   |    null|     0.0692 |  0.00%|
|PUNE ZONE| HT-INDUSTRIAL|HT-IND.(NON-EXPRESS)|FY 12-13| 1,508.0498 |    null| 1,101.6759 |102.70%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY 12-13|    (0.0000)|    n

In [36]:
import  pyspark.sql.functions as F
dfPayment = df.withColumn("Demand", df["Demand"].cast("float"))
dfPayment.printSchema()
dfPayment.show(10)

root
 |-- Zone Name: string (nullable = true)
 |-- Main Category: string (nullable = true)
 |-- Sub Category: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Sale: string (nullable = true)
 |-- Demand: float (nullable = true)
 |-- Collection: string (nullable = true)
 |-- EFFI(%): string (nullable = true)

+---------+--------------+--------------------+--------+------------+--------+------------+-------+
|Zone Name| Main Category|        Sub Category|    Year|        Sale|  Demand|  Collection|EFFI(%)|
+---------+--------------+--------------------+--------+------------+--------+------------+-------+
|PUNE ZONE| HT-INDUSTRIAL|    HT-IND.(EXPRESS)|FY 12-13| 1,253.5944 |    null|   967.3706 |105.48%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY 12-13|        -   |    null|     0.0692 |  0.00%|
|PUNE ZONE| HT-INDUSTRIAL|HT-IND.(NON-EXPRESS)|FY 12-13| 1,508.0498 |    null| 1,101.6759 |102.70%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY 12-13|    (0.0000)|    n

In [44]:
dfPayment.createOrReplaceTempView("Demand")
spark.sql("""
    SELECT Year, sum(Demand) as demand
    FROM Demand
    GROUP by Year
    order by demand desc
""").show()

+--------+------------------+
|    Year|            demand|
+--------+------------------+
|FY 12-13| 2509.779991894029|
|FY 13-14|2404.7115864753723|
+--------+------------------+



In [45]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
paymentSchema = R([
    Fld("Zone Name",Str()),
    Fld("Main Category",Str()),
    Fld("Sub Category",Str()),
    Fld("Year",Str()),
    Fld("Sale",Dbl()),
    Fld("Demand",Dbl()),
    Fld("Collection",Dbl()),
    Fld("EFFI(%)",Dbl()),
])

In [46]:
dfPaymentWithSchema = spark.read.csv("s3a://billingarun/Billing.csv",sep=",", schema=paymentSchema, header=True)

In [47]:
dfPaymentWithSchema.printSchema()
df.show(5)

root
 |-- Zone Name: string (nullable = true)
 |-- Main Category: string (nullable = true)
 |-- Sub Category: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Sale: double (nullable = true)
 |-- Demand: double (nullable = true)
 |-- Collection: double (nullable = true)
 |-- EFFI(%): double (nullable = true)

+---------+--------------+--------------------+--------+------------+------------+------------+-------+
|Zone Name| Main Category|        Sub Category|    Year|        Sale|      Demand|  Collection|EFFI(%)|
+---------+--------------+--------------------+--------+------------+------------+------------+-------+
|PUNE ZONE| HT-INDUSTRIAL|    HT-IND.(EXPRESS)|FY 12-13| 1,253.5944 | 1,020.3831 |   967.3706 |105.48%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY 12-13|        -   |        -   |     0.0692 |  0.00%|
|PUNE ZONE| HT-INDUSTRIAL|HT-IND.(NON-EXPRESS)|FY 12-13| 1,508.0498 | 1,131.4004 | 1,101.6759 |102.70%|
|PUNE ZONE|P.D. CONSUMERS|     HT-PD CONSUMERS|FY

In [51]:
dfPaymentWithSchema.createOrReplaceTempView("billing")
spark.sql("""
    SELECT Year as d, sum(Sale) as revenue
    FROM billing
    GROUP by d
    order by revenue desc
""").show()

+--------+------------------+
|       d|           revenue|
+--------+------------------+
|FY 12-13|2509.7799999999993|
|FY 13-14|2404.7116000000005|
|    null|              null|
+--------+------------------+

