# `Import Necessary Library`

In [None]:
import os
import glob
import findspark
SPARK_HOME = os.getenv("SPARK_HOME")
findspark.init(SPARK_HOME)

%matplotlib inline
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime 
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
filepath = "log-data"
log_df = spark.read.format('json').load(filepath)
# df = spark.read.json("log-data/*.json")

In [None]:
log_df.printSchema()

In [None]:
log_df.limit(5).toPandas()

In [None]:
log_df.createOrReplaceTempView("log_df")
ts_col =  spark.sql("""
                    SELECT ts FROM log_df
                    WHERE lower(page) = "nextsong"
""")


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, StringType

@udf(MapType(StringType(), StringType()))
def convert_ts(ts):
    """
    Parameter:
            The 'ts' parameter is a millisecond value from the log_data

    Function:
            Convert the timestamp in millisecond to the required format i.e
            hour, day, week of year, month, year and weekday.
    """
    
    t = datetime.fromtimestamp(ts/1000)
    try:
        hour = t.hour
        day =  t.day
        week_of_year = t.isocalendar()[1]
        month = t.month
        year = t.year
        weekday =  t.weekday()

    except Exception as e:
        print(e)

    return {"start_time"   : ts, 
                'hour'              : hour, 
                'day'                : day, 
                'week'              : week_of_year, 
                'month'            : month, 
                'year'               : year, 
                'weekday'         : weekday}

In [None]:
ts_col = ts_col.withColumn("parsed_ts", convert_ts("ts"))

In [None]:
ts_col.printSchema()

In [None]:
pd.set_option("max_colwidth", 200)
ts_col.limit(5).toPandas()

In [None]:
fields = ['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']
exprs = [f"parsed_ts['{field}'] as {field}" for field in fields ]
exprs

In [None]:
# Make the time table by mapping with headings
time_t = ts_col.selectExpr(*exprs).dropDuplicates()
time_t.count()

In [None]:
# Drop duplicated values
time_t = time_t.dropDuplicates()
time_t.count()

In [None]:
time_t.toPandas().to_csv("time.csv") # save to csv

## `Extracing User Table`

In [None]:
log_df.describe()

In [None]:
# user_id, first_name, last_name, gender, level
user_t = log_df.select(["userId", "firstName", "lastName", "gender", "level"]).dropDuplicates()
user_t.limit(5).toPandas()

In [None]:
user_t.createOrReplaceTempView("user_t")

# spark.sql("""
#     SELECT userId, count(*) as time_shown from user_t
#     GROUP BY userId
#     HAVING time_shown >1
# """).toPandas()

In [None]:
spark.sql("""
    SELECT  * from user_t
   WHERE userId = 15 or userId = 29 or userId = 85 or userId = ""
""").toPandas()

# Loading song-data into dataFrame
Here, I created Schema for the data so as to get the datatype correctly. This is a step you will want to consider very often if you are loading data into relational database

In [None]:
from pyspark.sql.types import StructType, StructField , DoubleType, IntegerType ,  StringType, TimestampType
songSchema =  StructType([
                        StructField("artist_id", StringType()),
                        StructField("artist_latitude", DoubleType()),
                        StructField("artist_location", StringType()),
                        StructField("artist_longitude", DoubleType()),
                        StructField("artist_name", StringType()),
                        StructField("duration", DoubleType()),
                        StructField("num_songs", IntegerType()),
                        StructField("song_id", StringType()),
                        StructField("title", StringType()),
                        StructField("year", IntegerType()),
])

# Concatenate Dataframe
This step is not needed, I used a simplier method in the compiled code but this is just another way to load data from different directory and how to concatenate dataframe in spark

In [None]:
filepath = r"C:\DiT\CS\Data Engineer\Module 3 - Data lake\Data lake project\song-data"

#  This gets all the json files in the mother directory
all_files = []
for root, dirs, files in os.walk(filepath):
    files = glob.glob(os.path.join(root,'*.json'))
    for f in files :
        all_files.append(os.path.abspath(f))

# this loads all the json file as a dataframe and append the dataframe into a list
dfs = []
for what in all_files:
    a = spark.read.json(what, schema=songSchema)
    # print(a.count())
    dfs.append(a)

# this joins all the dataframe in the list together
song_df =  reduce(lambda first, second: first.union(second), dfs)

In [None]:
song_df.printSchema()

In [None]:
song_df.limit(5).toPandas()

## `Extracting Song Table`

In [None]:
# song_id, title, artist_id, year, duration
song_t = song_df.select(["song_id", "title", "artist_id", "year", "duration"])
song_t.limit(5).toPandas()

In [None]:
# song_df.describe()
song_df.printSchema()

## `Extracting Artist Table for the song-data`
``` spark
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
```

In [None]:
# artist_id, name, location, lattitude, longitude
artist_t = song_df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"])
artist_t.limit(5).toPandas()

## `Extracting the songplay table`

In [None]:
log_df.limit(5).toPandas()

In [None]:
# Create spark veiw so as to work with dataframes like a table in SQL

In [None]:
# songplay_id, start_time, user_id, level, session_id, location, user_agent                               song_id, artist_id
log_df.createOrReplaceTempView("log_df")
song_t.createOrReplaceTempView("song_t")
artist_t.createOrReplaceTempView("artist_t")
song_df.createOrReplaceTempView("song_df")

# user_t
# song_t
# time_t
# artist_t
song_df.describe()

In [None]:
spark.sql("""
                SELECT l.ts, l.userId, l.level, l.userAgent, l.sessionId, l.location, s.song_id, s.artist_id
                FROM log_df l
                LEFT JOIN song_df  s ON l.length = s.duration
                LIMIT 5
""").toPandas()

In [None]:
songplays = spark.sql("""
                SELECT DISTINCT 
                            l.ts as start_time, 
                            l.userId as user_id, 
                            l.level, 
                            s.song_id, 
                            s.artist_id, 
                            l.sessionId as session_id,
                            l.location,
                            l.userAgent as user_agent                           
                FROM 
                            log_df l
                LEFT JOIN
                            song_df s ON l.artist = s.artist_name
                WHERE  lower(page) = "nextsong"
""")
songplays.count()

In [None]:
time_table.limit(5).toPandas()

In [None]:
print(f"songplay = {songplays_table.count()}\nusers = {users_table.count()} \nsongs = {song_table.count()} \nartists = {artist_table.count()}\ntime = {time_table.count()}")

# `Code compliation`
The code compliation was a pain in the neck, I will advice to putting the code together as the code shows postive response as you debug

In [None]:
import pandas as pd
import configparser
import os
import findspark
from datetime import datetime 

# Get spark location on PC
SPARK_HOME = os.getenv("SPARK_HOME")
findspark.init(SPARK_HOME)

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import MapType, StructType, StructField , DoubleType, IntegerType ,  StringType, TimestampType



config = configparser.ConfigParser()
config.read(r"C:\DiT\CS\Data Engineer\Module 3 - Data lake\dl.cfg")

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    """Create a apache spark session."""
    spark = SparkSession.builder \
                .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.2") \
                .appName("Using Spark on S3") \
                .getOrCreate()
    print("SparkSession Created!")
    return spark


@udf(MapType(StringType(), StringType()))
def convert_ts(ts):
    """
    Parameter:
            The 'ts' parameter is a millisecond value from the log_data

    Function:
            Convert the timestamp in millisecond to the required format i.e
            hour, day, week of year, month, year and weekday.
    """
    
    t = datetime.fromtimestamp(ts/1000)
    try:
        hour = t.hour
        day =  t.day
        week_of_year = t.isocalendar()[1]
        month = t.month
        year = t.year
        weekday =  t.weekday()

    except Exception as e:
        print(e)

    return {"start_time"   : ts, 
                'hour'              : hour, 
                'day'                : day, 
                'week'              : week_of_year, 
                'month'            : month, 
                'year'               : year, 
                'weekday'         : weekday}


def process_song_data(spark, input_data, output_data):

    
    """
    This help process the song data into the different dimension table


    Parameters
    ----------
    spark: session
        This is the spark session that has been created
    input_data: path
        This is the path to the song_data s3 bucket.
    """

    print("\nRunning process_song_data")
    # making a struct for the columns
    songSchema =  StructType([
        StructField("artist_id", StringType()),
        StructField("artist_latitude", DoubleType()),
        StructField("artist_location", StringType()),
        StructField("artist_longitude", DoubleType()),
        StructField("artist_name", StringType()),
        StructField("duration", DoubleType()),
        StructField("num_songs", IntegerType()),
        StructField("song_id", StringType()),
        StructField("title", StringType()),
        StructField("year", IntegerType()),
            ])

    # get filepath to song data file
    song_data = input_data + 'song_data/*/*/*/*.json'
    
    # read song data file
    song_df = spark.read.json(song_data, schema=songSchema)

    # extract columns to create songs table
    songs_table = song_df.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates()
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year', 'artist_id') \
                     .parquet(os.path.join(output_data, 'songs/songs.parquet'), 'overwrite')
    print("songs_table created and save out as parquet")
    
    # extract columns to create artists table
    artists_table = song_df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).dropDuplicates()
    
    # write artists table to parquet files
    artists_table.write.parquet(os.path.join(output_data, 'artists/artists.parquet'), 'overwrite')
    print("artists_table created and save out as parquet")

    return song_df, songs_table, artists_table
    
   


def process_log_data(spark, song_df, input_data, output_data):
    print("\Running process_log_data")
    # get filepath to log data file
    log_data = input_data + 'log_data/*.json'

    log_df = spark.read.json(log_data)
    log_df = log_df.filter(log_df.page == "NextSong")
    
    users_table = log_df.select(["userId", "firstName", "lastName", "gender", "level"]).dropDuplicates()
    
    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data, 'users/users.parquet'), 'overwrite')
    print("users_table created and save out as parquet")

    # create timestamp column from original timestamp column
    ts_col = log_df.select("ts")
    ts_col = ts_col.withColumn("parsed_ts", convert_ts("ts"))

    #  Process the field name
    fields = ['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']
    exprs = [f"parsed_ts['{field}'] as {field}" for field in fields ]

    # extract columns to create time table
    time_table = ts_col.selectExpr(*exprs).dropDuplicates() 
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'time/time.parquet'), 'overwrite')
    print("time_table created and save out as parquet")

    # create month column from datetime
    get_month = udf(lambda x: datetime.fromtimestamp(x / 1000).month)
    log_df = log_df.withColumn("month", get_month(log_df.ts))
    
    # create year column from datetime
    get_year = udf(lambda x: datetime.fromtimestamp(x / 1000).year)
    log_df = log_df.withColumn("year", get_year(log_df.ts))

    # extract columns from joined song and log datasets to create songplays table 
    song_df.createOrReplaceTempView("song_df")
    log_df.createOrReplaceTempView("log_df")

    songplays_table = spark.sql("""
                    SELECT DISTINCT 
                                l.ts as start_time, 
                                l.userId as user_id, 
                                l.level, 
                                s.song_id, 
                                s.artist_id, 
                                l.sessionId as session_id,
                                l.location,
                                l.userAgent as user_agent,
                                l.year,
                                l.month                               
                    FROM 
                                log_df l
                    LEFT JOIN
                                song_df s ON l.artist = s.artist_name
                    """)

    songplays_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'songplays/songplays.parquet'), 'overwrite')
    print("songplays_table created and save out as parquet")

    return log_df, users_table, time_table, songplays_table



# Execute tasks

In [None]:
"""
Perform the following roles:
1.) Get or create a spark session.
1.) Read the song and log data from s3.
2.) take the data and transform them to tables
which will then be written to parquet files.
3.) Load the parquet files on s3.
"""

spark = create_spark_session()
input_data = r"data\\"
# input_data = r"s3://udacity-dend/"
output_data = "out"

song_df, song_table, artist_table = process_song_data(spark, input_data, output_data)
log_df, users_table, time_table, songplays_table = process_log_data(spark, song_df, input_data, output_data)    
