# Exercise 3: Data Lake on S3

In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [None]:
#os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.0 pyspark-shell"

# Make sure that your AWS credentials are loaded as env vars

In [None]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

#os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
#os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']
KEY=config.get('AWS','AWS_ACCESS_KEY_ID')
SECRET= config.get('AWS','AWS_SECRET_ACCESS_KEY')
#print(KEY)

# Create spark session with hadoop-aws package

In [None]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3a.awsAccessKeyId", KEY)
hadoop_conf.set("fs.s3a.awsSecretAccessKey", SECRET)

In [None]:
# get filepath to song data file
import boto3

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("udacity-dend")

for obj in sampleDbBucket.objects.filter(Prefix="song_data/"):
    print(obj)
    #key = obj.key
    #body = obj.get()['Body'].read()
    #print(body)

In [None]:
# read song data file
df=spark.read.json("s3a://udacity-dend/song_data/A/A/A/*.json")
df.printSchema()
df.select('song_id','title','artist_id','year','duration').show()
songs_table = df
columns_to_drop = ['artist_name','artist_location','artist_latitude', 'artist_longitude','num_songs']
songs_table = songs_table.drop(*columns_to_drop)
songs_table.select('song_id','title','artist_id','year','duration').dropDuplicates().collect()  
# write songs table to parquet files partitioned by year and artist
songs_table.write.format("parquet").mode("overwrite").save("data/output/songs.parquet")
#.partitionBy("year","artist_id")

In [None]:
#df.select('artist_id','artist_name','artist_location','artist_latitude', 'artist_longitude').dropDuplicates()
artists_table = df.select('artist_id','artist_name','artist_location','artist_latitude', 'artist_longitude')
#columns_to_drop = ['duration', 'num_songs','song_id','title','year']
#artists_table = artists_table.drop(*columns_to_drop)
#artists_table.printSchema()
artists_table.select('artist_id','artist_name','artist_location','artist_latitude', 'artist_longitude').dropDuplicates().collect()
artists_table.write.parquet("data/output/artists.parquet",mode='overwrite',compression='snappy')

In [None]:
# get filepath to song data file
#song_data = 
import boto3

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )

sampleDbBucket =  s3.Bucket("udacity-dend")

for obj in sampleDbBucket.objects.filter(Prefix="log_data/"):
    print(obj)    
    
for obj in sampleDbBucket.objects.filter(Prefix="log_data/2018/11/2018-11-01-events.json"):   
    key = obj.key
    body = obj.get()['Body'].read()
    print(body)

In [None]:
df=spark.read.json("s3a://udacity-dend/log_data/2018/11/*.json")
df.printSchema()
df.show(5)

In [None]:
# filter by actions for song plays
df1=df.filter(df.method=='GET')

In [None]:
df1.count()
df1.printSchema()
df1.show(5)

In [None]:
users_table = df.select('userId','firstName','lastName','gender','level')


users_table.select('userId','firstName','lastName','gender','level').dropDuplicates().collect()
users_table=users_table.withColumnRenamed('userId','user_id') \
                            .withColumnRenamed('firstName','first_name') \
                            .withColumnRenamed('lastName','last_name')                          
# write users table to parquet files    
users_table.write.parquet("data/output/users.parquet",mode='overwrite',compression='snappy')

In [None]:
#Find Duplicates in Columns
import pyspark.sql.functions as udf
users_table.agg(
    udf.count('user_id').alias('count'),
    udf.countDistinct('user_id').alias('distinct count')
).show()

In [None]:
#Drop duplicate rows using subset method
users_table=users_table.dropDuplicates(subset=[c for c in users_table.columns if c!='user_id'])
users_table.show()

In [None]:
# this will give in every rows how many columns has misisng values e.g. For the User_Id='' is having 4 columns out of which 3 cloumns doesn't have any value. 
users_table.rdd.map(
    lambda row:(row['user_id'],sum([c==None for c in row]))
).collect()

#users_table.where(col("user_id").isNull()).show()

In [43]:
# Percentage of missing values in each columns
users_table.agg(*[
    (1-(udf.count(c) /udf.count('*'))).alias(c+'_missing')
    for c in users_table.columns
]).show()

+---------------+--------------------+--------------------+--------------------+-------------+
|user_id_missing|  first_name_missing|   last_name_missing|      gender_missing|level_missing|
+---------------+--------------------+--------------------+--------------------+-------------+
|            0.0|0.018867924528301883|0.018867924528301883|0.018867924528301883|          0.0|
+---------------+--------------------+--------------------+--------------------+-------------+



In [44]:
# create timestamp column from original timestamp column
#get_timestamp = udf()  year, month, dayofmonth, hour, weekofyear, date_format
import  pyspark.sql.functions as udf
from pyspark.sql.functions import unix_timestamp
import pandas as pd

time_table = df
columns_to_drop = ['artist','auth','itemInSession','length','location','method', \
                   'page','registration','sessionId','song','status','userAgent', \
                   'userId','firstName','lastName','gender','level']
time_table = time_table.drop(*columns_to_drop)
time_table.printSchema()
time_table.select('ts').dropDuplicates().collect()
time_table = time_table.withColumn("ts",udf.to_timestamp(udf.from_unixtime(udf.col("ts")/1000))) \
           .withColumn("year", udf.year("ts")) \
           .withColumn("month", udf.month("ts")) \
           .withColumn("dayofmonth", udf.dayofmonth("ts")) \
           .withColumn("hour", udf.hour("ts")) \
           .withColumn("weekofyear", udf.weekofyear("ts")) \
           .withColumn("weekday", udf.dayofweek("ts")) \
           .withColumn("date_format", udf.date_format("ts",'MM/dd/yyy')) 
time_table.printSchema()
time_table['ts','year','month','dayofmonth','hour','weekofyear','weekday','date_format'].show(5)

root
 |-- ts: long (nullable = true)

root
 |-- ts: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- date_format: string (nullable = true)

+-------------------+----+-----+----------+----+----------+-------+-----------+
|                 ts|year|month|dayofmonth|hour|weekofyear|weekday|date_format|
+-------------------+----+-----+----------+----+----------+-------+-----------+
|2018-11-15 00:30:26|2018|   11|        15|   0|        46|      5| 11/15/2018|
|2018-11-15 00:41:21|2018|   11|        15|   0|        46|      5| 11/15/2018|
|2018-11-15 00:45:41|2018|   11|        15|   0|        46|      5| 11/15/2018|
|2018-11-15 01:57:51|2018|   11|        15|   1|        46|      5| 11/15/2018|
|2018-11-15 03:29:37|2018|   11|        15|   3|        46|      5| 11/15/2018

In [47]:
# create timestamp column from original timestamp column
#get_timestamp = udf()  year, month, dayofmonth, hour, weekofyear, date_format
import  pyspark.sql.functions as udf
from pyspark.sql.functions import unix_timestamp
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Use pandas_udf to define a Pandas UDF
@pandas_udf('timestamp', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles

def millisToTimeStamp(t):
    return pd.to_datetime(t,unit='ms')

time_table = df['ts']
time_table.printSchema()
time_table=time_table.withColumn('ts',millisToTimeStamp(time_table['ts']))
time_table.printSchema()
time_table.select('ts').dropDuplicates().collect()

time_table = time_table.withColumn("ts",udf.to_timestamp(udf.from_unixtime(udf.col("ts")/1000))) \
           .withColumn("year", udf.year("ts")) \
           .withColumn("month", udf.month("ts")) \
           .withColumn("dayofmonth", udf.dayofmonth("ts")) \
           .withColumn("hour", udf.hour("ts")) \
           .withColumn("weekofyear", udf.weekofyear("ts")) \
           .withColumn("weekday", udf.dayofweek("ts")) \
           .withColumn("date_format", udf.date_format("ts",'MM/dd/yyy')) 
time_table.printSchema()
time_table['ts','year','month','dayofmonth','hour','weekofyear','weekday','date_format'].show(5)

ImportError: PyArrow >= 0.8.0 must be installed; however, it was not found.

In [None]:
song_df=spark.read.json("s3a://udacity-dend/song_data/A/A/A/*.json")
song_df.printSchema()
#song_df.show(5)

songplays_table = df
songplays_table.printSchema()
#songplays_table.show(5)

songsplay_final=songplays_table.join(song_df,songplays_table.song==song_df.title,'inner') \
        .drop('auth','itemInSession','method','page','registration','status','firstName','lastName','gender','artist_latitude', \
             'artist_location','artist_longitude','artist_name','duration','num_songs','year','song','artist','title')  

songsplay_final.show(5)

# Infer schema, fix header and separator

In [None]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", inferSchema=True, header=True)

In [None]:
df.printSchema()
df.show(5)

# Fix the data yourself 

In [None]:
import  pyspark.sql.functions as F
dfPayment = df.withColumn("payment_date", F.to_timestamp("payment_date"))
dfPayment.printSchema()
dfPayment.show(5)

# Extract the month

In [None]:
dfPayment = dfPayment.withColumn("month", F.month("payment_date"))
dfPayment.show(5)

# Computer aggregate revenue per month

In [None]:
dfPayment.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month, sum(amount) as revenue
    FROM payment
    GROUP by month
    order by revenue desc
""").show()

# Fix the schema

In [None]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
paymentSchema = R([
    Fld("payment_id",Int()),
    Fld("customer_id",Int()),
    Fld("staff_id",Int()),
    Fld("rental_id",Int()),
    Fld("amount",Dbl()),
    Fld("payment_date",Date()),
])

In [None]:
dfPaymentWithSchema = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", schema=paymentSchema, header=True)


In [None]:
dfPaymentWithSchema.printSchema()
df.show(5)

In [None]:
dfPaymentWithSchema.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month(payment_date) as m, sum(amount) as revenue
    FROM payment
    GROUP by m
    order by revenue desc
""").show()