# Project: Data Lake with Spark

## Import Modules

In [32]:
import configparser
import os
import pyspark.sql.functions as F

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import udf, col, monotonically_increasing_id

## Create Spark Session

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

## Set AWS credentials

In [3]:
config = configparser.ConfigParser()

config.read_file(open('aws/dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

print(os.environ["AWS_ACCESS_KEY_ID"], os.environ["AWS_SECRET_ACCESS_KEY"])

'AKIAJTEM4RBXSBL4HZHA' 'SHHl+kMGF5JqpSC13DZwKJsy2aqgy17tt3gMY1ob'


#### Test AWS S3 Access

## Data Source

In [4]:
input_data = "data/"
output_data = "output/"

## Process Song data

#### Get filepath to song data file

In [5]:
song_data = os.path.join(input_data, "song_data/*/*/*/")

#### Read song data file

In [6]:
song_df = spark.read.json(song_data)

In [7]:
song_df.count()

71

#### Create songs table

In [8]:
songs_table = song_df.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()
songs_table.show(5, False)

+------------------+---------------------------------------------------+------------------+----+---------+
|song_id           |title                                              |artist_id         |year|duration |
+------------------+---------------------------------------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|¿Dónde va Chichi?                                  |ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas                               |ARMBR4Y1187B9990EB|0   |241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to Sleeping Giants                    |ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|Pink World                                         |AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife Is Running Around On Me (Taco Hell)|ARDNS031187B9924F0|2005|186.48771|
+------------------+---------------------------------------------------+------------------+----+---------+
only showing top 5 rows



In [9]:
songs_table.write.parquet(os.path.join(output_data, 'songs'), mode='overwrite', partitionBy=['year','artist_id'])

#### Create artists table

In [10]:
artists_table = song_df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).distinct()
artists_table.show(5, truncate = False)

+------------------+---------------+---------------+---------------+----------------+
|artist_id         |artist_name    |artist_location|artist_latitude|artist_longitude|
+------------------+---------------+---------------+---------------+----------------+
|AR3JMC51187B9AE49D|Backstreet Boys|Orlando, FL    |28.53823       |-81.37739       |
|AR0IAWL1187B9A96D0|Danilo Perez   |Panama         |8.4177         |-80.11278       |
|ARWB3G61187FB49404|Steve Morse    |Hamilton, Ohio |null           |null            |
|AR47JEX1187B995D81|SUE THOMPSON   |Nevada, MO     |37.83721       |-94.35868       |
|ARHHO3O1187B989413|Bob Azzam      |               |null           |null            |
+------------------+---------------+---------------+---------------+----------------+
only showing top 5 rows



In [11]:
artists_table.write.parquet(os.path.join(output_data, 'artists'), mode='overwrite', partitionBy=['artist_id'])

## Process Log data

#### Get filepath to log data file

In [12]:
log_data = os.path.join(input_data, 'log_data/*/*/*.json')

#### Read log data file

In [13]:
log_df = spark.read.json(log_data)
log_df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

In [14]:
log_df.count()

8056

#### filter by actions for song plays

In [15]:
log_df = log_df.where(log_df.page == 'NextSong')
log_df.count()

6820

In [16]:
log_df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

### Create users table

In [17]:
users_table = log_df.select(["userId", "firstName", "lastName", "gender", "level"]).distinct()
users_table.show(5, truncate = False)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|57    |Katherine|Gay     |F     |free |
|84    |Shakira  |Hunt    |F     |free |
|22    |Sean     |Wilson  |F     |free |
|52    |Theodore |Smith   |M     |free |
|80    |Tegan    |Levine  |F     |paid |
+------+---------+--------+------+-----+
only showing top 5 rows



In [18]:
users_table.write.parquet(os.path.join(output_data, 'users'), mode='overwrite', partitionBy = ['userId'])

### Create time table

#### Create timestamp column from original "ts" column

In [19]:
log_df = log_df.withColumn('timestamp',( (log_df.ts.cast('float')/1000).cast("timestamp")) )
log_df.select('timestamp').show(3, False)

+-----------------------+
|timestamp              |
+-----------------------+
|2018-11-15 00:29:39.712|
|2018-11-15 00:40:35.072|
|2018-11-15 00:44:57.216|
+-----------------------+
only showing top 3 rows



In [20]:
from pyspark.sql.types import TimestampType

get_timestamp = udf(lambda t: datetime.fromtimestamp(t/1000), TimestampType())
log_df = log_df.withColumn("timestamp", get_timestamp(col("ts")))

In [21]:
log_df.select('timestamp').show(3, False)

+-----------------------+
|timestamp              |
+-----------------------+
|2018-11-15 00:30:26.796|
|2018-11-15 00:41:21.796|
|2018-11-15 00:45:41.796|
+-----------------------+
only showing top 3 rows



In [22]:
get_datetime = udf(lambda x: to_date(x), TimestampType())
log_df = log_df.withColumn("datetime", get_timestamp(col("ts")))

In [23]:
log_df.select('datetime').show(3, False)

+-----------------------+
|datetime               |
+-----------------------+
|2018-11-15 00:30:26.796|
|2018-11-15 00:41:21.796|
|2018-11-15 00:45:41.796|
+-----------------------+
only showing top 3 rows



#### Create **time_table**

In [24]:
time_table = log_df.select(F.col('datetime').alias('start_time'),
                       F.year('datetime').alias('year'),
                       F.month('datetime').alias('month'),
                       F.dayofmonth('datetime').alias('day'),
                       F.hour('datetime').alias('hour'),
                       F.minute('datetime').alias('min'),
                       F.second('datetime').alias('sec'),
                       F.weekofyear('datetime').alias('week'),
                       F.date_format('datetime', 'u').alias('weekday')
                       )

time_table.show(5, False)

+-----------------------+----+-----+---+----+---+---+----+-------+
|start_time             |year|month|day|hour|min|sec|week|weekday|
+-----------------------+----+-----+---+----+---+---+----+-------+
|2018-11-15 00:30:26.796|2018|11   |15 |0   |30 |26 |46  |4      |
|2018-11-15 00:41:21.796|2018|11   |15 |0   |41 |21 |46  |4      |
|2018-11-15 00:45:41.796|2018|11   |15 |0   |45 |41 |46  |4      |
|2018-11-15 03:44:09.796|2018|11   |15 |3   |44 |9  |46  |4      |
|2018-11-15 05:48:55.796|2018|11   |15 |5   |48 |55 |46  |4      |
+-----------------------+----+-----+---+----+---+---+----+-------+
only showing top 5 rows



### Create song play table

#### Read Song data file

In [25]:
song_df = spark.read.json(input_data+'song_data/*/*/*/*.json')

In [26]:
song_df.show(1)

+------------------+---------------+---------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|               |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
+------------------+---------------+---------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
only showing top 1 row



In [27]:
log_df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+--------------------+--------------------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|           timestamp|            datetime|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+--------------------+--------------------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|2018-11-15 00:30:...|
+--------+--

#### Join song_df abd log_df

In [28]:
song_log_joined_table = log_df.join(song_df, (log_df.song == song_df.title) & (log_df.artist == song_df.artist_name) & (log_df.length == song_df.duration), how='inner')

In [29]:
song_log_joined_table.show(1)

+------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------+------+-------------+--------------------+------+--------------------+--------------------+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+--------------+----+
|artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|          song|status|           ts|           userAgent|userId|           timestamp|            datetime|         artist_id|artist_latitude|artist_location|artist_longitude|artist_name| duration|num_songs|           song_id|         title|year|
+------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------+------+-------------+--------------------+------+--------------------+------

#### Extract columns to create songplays table

In [29]:
songplays_table = song_log_joined_table.distinct() \
                    .select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent" ) \
                    .withColumn("songplay_id", F.row_number().over( Window.partitionBy("timestamp").orderBy("timestamp"))) \
                    .withColumnRenamed("userId","user_id")        \
                    .withColumnRenamed("timestamp","start_time")  \
                    .withColumnRenamed("sessionId","session_id")  \
                    .withColumnRenamed("userAgent", "user_agent")
                    

songplays_table.show(5)

+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+
|user_id|          start_time|           song_id|         artist_id|level|session_id|            location|          user_agent|songplay_id|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+
|     15|2018-11-21 21:56:...|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4| paid|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|          1|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+-----------+



In [33]:
song_df = song_df['artist_id', 'artist_name', 'artist_location', 'song_id', 'title']
song_df.show(1)

+------------------+--------------------+---------------+------------------+--------------------+
|         artist_id|         artist_name|artist_location|           song_id|               title|
+------------------+--------------------+---------------+------------------+--------------------+
|ARDR4AC1187FB371A1|Montserrat Caball...|               |SOBAYLL12A8C138AF9|Sono andati? Fing...|
+------------------+--------------------+---------------+------------------+--------------------+
only showing top 1 row



In [34]:
log_df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+--------------------+--------------------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|           timestamp|            datetime|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+--------------------+--------------------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|2018-11-15 00:30:...|
+--------+--

In [37]:
songplays_table = log_df.join(song_df, song_df.artist_name == log_df.artist, "inner").distinct() \
                      .select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent") \
                      .withColumn("songplay_id", monotonically_increasing_id()) \
                      .withColumnRenamed("timestamp","start_time") \
                      .withColumnRenamed("userId","user_id") \
                      .withColumnRenamed("sessionId","session_id") \
                      .withColumnRenamed("userAgent","user_agent")

In [38]:
songplays_table.show(5)

+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+------------+
|user_id|          start_time|           song_id|         artist_id|level|session_id|            location|          user_agent| songplay_id|
+-------+--------------------+------------------+------------------+-----+----------+--------------------+--------------------+------------+
|     44|2018-11-10 09:40:...|SOFFKZS12AB017F194|ARBEBBY1187B9B43DB| paid|       350|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|188978561024|
|     29|2018-11-26 17:57:...|SOQVMXR12A81C21483|ARKULSX1187FB45F84| paid|       924|Atlanta-Sandy Spr...|"Mozilla/5.0 (Mac...|197568495616|
|     80|2018-11-24 12:42:...|SOBONFF12A6D4F84D8|ARIK43K1187B9AE54C| paid|       903|Portland-South Po...|"Mozilla/5.0 (Mac...|214748364800|
|    101|2018-11-14 20:16:...|SORRZGD12A6310DBC3|ARVBRGZ1187FB4675A| free|       603|New Orleans-Metai...|"Mozilla/5.0 (Win...|369367187456|
|     24|2018

## Write to parquet files locally

In [39]:
songs_table.write.parquet(output_data + 'songs/' + 'artists.parquet')
artists_table.write.parquet(output_data + 'artists/' + 'artists.parquet')
users_table.write.parquet(output_data + 'users/' + 'artists.parquet')
time_table.write.parquet(output_data + 'time/' + 'artists.parquet')
songplays_table.write.parquet(output_data + 'songplays/' + 'artists.parquet')

## Create AWS S3 Bucket

In [40]:
import boto3

bucket_root='nd-data-engineering'
s3 = boto3.resource('s3', region_name='us-west-2')

In [44]:
try:
        s3.create_bucket(ACL='private',Bucket=bucket_root,
                             CreateBucketConfiguration={'LocationConstraint':'us-west-2'})
except Exception as e:
        if 'BucketAlreadyOwnedByYou' in str(e):
            print(f'{bucket_root} already exists')
        else:
           raise e

ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateBucket operation: The AWS Access Key Id you provided does not exist in our records.

In [42]:
bucket = s3.Bucket(bucket_root)
folders_name = ['songs','songplays','time','artists','users']

# Create folders for the tables
for f in folders_name:
    fold_name = f'{f}/'
    try: 
        bucket.put_object(Key=fold_name)
    except Exception as e:
        raise e
print('Folders created')

ClientError: An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.

In [45]:
bucket = s3.Bucket(bucket_root  )

for obj in bucket.objects.all():
    print(obj.key)

ClientError: An error occurred (InvalidAccessKeyId) when calling the ListObjects operation: The AWS Access Key Id you provided does not exist in our records.

## Write to AWS S3 Bucket

In [None]:
output_data  = "s3a://udacity-de-sparkify-data-lake/" 

songplays_table.write.parquet(output_data + 'songplays/' + 'songplays.parquet')
songs_table.write.parquet(output_data + 'songs/' + 'songs.parquet')
artists_table.write.parquet(output_data + 'artists/' + 'artists.parquet')
users_table.write.parquet(output_data + 'users/' + 'users.parquet')
time_table.write.parquet(output_data + 'time/' + 'time.parquet')