# ***Project 3: Data Lake***

## STEP 0: Configration : 

In [3]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config["AWS"]['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config["AWS"]['AWS_SECRET_ACCESS_KEY']

## STEP 1: Creating Spark Session : 

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

## STEP2: LOAD DATA FROM S3 : 

In [5]:
input_data="s3a://udacity-dend/"
output_data="s3a://project4-datalake"

## STEP 2: Proccesing Song Data : 

In [23]:
def process_song_data(spark, input_data, output_data):
     '''
    This function process song data by:
    First: Reading the data 
    Second:Extract the needed coulmns(songs and artist) to be used in
    the dimontional modeling tables
    Third: write the tables to parquet files partitioned by year and artist for the song table. 
    
    Args: 
    spark= the spark session 
    input_data=the URL of data in S3.
    output_data=the URL for the written data on S3 after processing

    '''
    # get filepath to song data file
    song_data ="data/song-data/A/B/C/*.json"
    # read song data file
    df =spark.read.json(song_data)
    
    #create a view to use with SQL queries
    df.createOrReplaceTempView("song_data_table")
    
    # extract columns to create songs table
    songs_table = spark.sql(
                            """
                             SELECT DISTINCT song_id,
                                             title,
                                             artist_id, 
                                             year,
                                             duration
                                             FROM song_data_table 
                             """)
     # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy("year", "artist_id").parquet('songs/')
      # extract columns to create artists table
    artists_table =spark.sql("""
                                SELECT DISTINCT artist_id, 
                                                artist_name,
                                                artist_location,
                                                artist_latitude,
                                                artist_longitude
                                FROM song_data_table 
                            
                            """)
    
    # write artists table to parquet files
    artists_table.write.parquet('artists/')
    

IndentationError: unexpected indent (<ipython-input-23-6b087cda41af>, line 3)

## STEP3: Procesing Log Data: 

In [None]:
def process_log_data(spark, input_data, output_data):
    '''
    This function process song data by:
    First: Reading the data 
    Second:Extract the needed coulmns to be used in
    the dimontional modeling tables
    Third: write the tables to parquet files.
    Args: 
    spark= the spark session 
    input_data=the URL of data in S3.
    output_data=the URL for the written data on S3 after processing

    '''
    # get filepath to log data file
    log_data ="data/log-data/*.json"

    # read log data file
    df =spark.read.json(log_data)
    # filter by actions for song plays
    df =df.filter(df.page == 'NextSong')
     #create a view to use with SQL queries
    df.createOrReplaceTempView("log_data_table")
    # extract columns for users table    
    users_table = spark.sql(
                            """
                             SELECT DISTINCT userId,
                                             firstName,
                                             lastName, 
                                             gender,
                                             level
                                             FROM log_data_table
                             """)
     # write users table to parquet files
    users_table.write.parquet('users/')
#      # create timestamp column from original timestamp column
#     get_timestamp = udf()
#     df = 
    
#     # create datetime column from original timestamp column
#     get_datetime = udf()
#     df = 
##THE ABOVE STEPS ARE IGNORED BECAUSE I'LL USE SQL. 
    
    # extract columns to create time table
    time_table = spark.sql(""" SELECT  DISTINCT temp.start_time,
                                       hour( temp.start_time) as hour,
                                       dayofmonth( temp.start_time) as day,
                                       weekofyear( temp.start_time) as week,
                                       month( temp.start_time) as month,
                                       year(temp.start_time) as year,
                                       dayofweek(temp.start_time) as weekday
                             FROM   (SELECT to_timestamp(ts/1000) as start_time
                                     FROM log_data_table 
                                    ) temp
                             """)
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy("year", " month").parquet('time/')
    # read in song data to use for songplays table
    song_df = spark.read.parquet('songs/')
     # extract columns from joined song and log datasets to create songplays table 
    songplays_table = spark.sql(""" SELECT  DISTINCT 
                                            monotonically_increasing_id() as songplay_id,
                                            start_time,
                                            userId,
                                            song_id,
                                            artist_id,
                                            sessionId,
                                            location,
                                            userAgent
                                    FROM time_table 
                                    JOIN song_df s
                                    ON  time_table.song = song_df.title
                                    AND time_table.artist = song_df.artist_name
                                    AND time_table.length = song_df.duration
                                 """)


    # write songplays table to parquet files partitioned by year and month
    songplays_table..partitionBy("year", "month").parquet('songplays_table/')

In [6]:
 #try
    # read in song data to use for songplays table
spark=create_spark_session()
log_data ="data/log-data/*.json"

    # read log data file
df =spark.read.json(log_data)
df =df.filter(df.page == 'NextSong')
     #create a view to use with SQL queries
df.createOrReplaceTempView("log_data_table")
song_df = spark.read.parquet('songs/')
     # extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql(""" SELECT  DISTINCT 
                                            monotonically_increasing_id() as songplay_id,
                                            to_timestamp(ts/1000) as start_time,
                                            userId,
                                            song_id,
                                            artist_id,
                                            sessionId,
                                            location,
                                            userAgent
                                    FROM log_data_table 
                                    JOIN song_data_table 
                                    ON  log_data_table.song = song_data_table.title
                                    AND log_data_table.artist = song_data_table.artist_name
                                    AND log_data_table.length = song_data_table.duration
                                 """)


    # write songplays table to parquet files partitioned by year and month
songplays_table.partitionBy("year", "month").parquet('songplays_table/')

AnalysisException: 'Table or view not found: time_table; line 10 pos 39'

In [9]:
#TRY
spark=create_spark_session()
log_data ="data/log-data/*.json"

    # read log data file
df =spark.read.json(log_data)
df.printSchema()   
df =df.filter(df.page == 'NextSong')
     #create a view to use with SQL queries
df.createOrReplaceTempView("log_data_table")
# temp_table=spark.sql(""" SELECT to_timestamp(ts/1000) as start_time
#                          FROM log_data_table 
#                          WHERE ts IS NOT NULL
#                      """)
# temp_table.show()
time_table = spark.sql(""" SELECT  DISTINCT temp.start_time,
                                       hour( temp.start_time) as hour,
                                       dayofmonth( temp.start_time) as day,
                                       weekofyear( temp.start_time) as week,
                                       month( temp.start_time) as month,
                                       year(temp.start_time) as year,
                                       dayofweek(temp.start_time) as weekday
                             FROM   (SELECT to_timestamp(ts/1000) as start_time
                                     FROM log_data_table 
                                    ) temp
                             """)
    
    # write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet('time4/')
song_df = spark.read.parquet('songs/')
     # extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql(""" SELECT  DISTINCT 
                                            monotonically_increasing_id() as songplay_id,
                                            to_timestamp(ts/1000) as start_time,
                                            userId,
                                            song_id,
                                            artist_id,
                                            sessionId,
                                            location,
                                            userAgent
                                    FROM log_data_table
                                    JOIN song_df s
                                    ON  time_table.song = song_df.title
                                    AND time_table.artist = song_df.artist_name
                                    AND time_table.length = song_df.duration
                                 """)


    # write songplays table to parquet files partitioned by year and month
songplays_table.partitionBy("year", "month").parquet('songplays_table/')

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



AnalysisException: 'Table or view not found: song_df; line 11 pos 41'

In [6]:
#TRY
spark=create_spark_session()
log_data ="data/log-data/*.json"

    # read log data file
df =spark.read.json(log_data)
df.printSchema()   

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
spark=create_spark_session()
song_data ='song_data/A/B/C/*.json'
#     # read song data file
df =spark.read.json(song_data)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [31]:
# #TRY 
# spark=create_spark_session()
# song_data ='song_data/A/B/C/*.json'
#     # read song data file
# df =spark.read.json(song_data)
# df.printSchema()
# #create a view to use with SQL queries
# df.createOrReplaceTempView("song_data_table")
# songs_table = spark.sql(
# """
# SELECT DISTINCT song_id,
#        title
#        artist_id, 
#        year,
#        duration
# FROM song_data_table 
# """)
#songs_table.write.partitionBy("year", "artist_id").parquet('songs/')
artists_table =spark.sql("""
                                SELECT DISTINCT artist_id, 
                                                artist_name,
                                                artist_location,
                                                artist_latitude,
                                                artist_longitude
                                FROM song_data_table 
                            
                            """)
    
    # write artists table to parquet files
artists_table.write.parquet('artists/')