In [None]:
#!pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2

In [1]:
from pyspark.sql import SparkSession
import os
import pyspark
import configparser
import pyspark.sql.functions as F

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'

In [3]:
!aws --version

aws-cli/1.16.17 Python/3.6.3 Linux/4.15.0-1083-gcp botocore/1.12.7


In [4]:
'''
SparkSession - in-memory
SparkContext
Spark UI
Version
    v2.4.3
Master
    local[*]
AppName
    pyspark-shell
'''
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [5]:
%%time
df_events_raw = spark.read.json("s3a://udacity-dend/log_data/2018/11/*.json")

CPU times: user 8.32 ms, sys: 502 µs, total: 8.82 ms
Wall time: 38.6 s


In [6]:
def transform_events(df_events_raw):
    df_events = df_events_raw.filter(F.col('page')=='NextSong') # df_events.count() = 6820
    ts_transform = F.to_timestamp(F.from_unixtime(F.col("ts")/1000,'yyyy-MM-dd HH:mm:ss'))

    dt = df_events.withColumn("start_time", ts_transform)\
    .withColumn("hour",F.hour(ts_transform))\
    .withColumn("day",F.dayofmonth(ts_transform))\
    .withColumn("week",F.weekofyear(ts_transform))\
    .withColumn("month",F.month(ts_transform))\
    .withColumn("year",F.year(ts_transform))\
    .withColumn("weekday",F.date_format(ts_transform, "EEEE"))
    
    users_cols =["userId", "firstName", "lastName", "gender", "level","sessionId"]
    users_cols_new_names = ["user_id", "first_name", "last_name", "gender", "level","session_id"]
    name_mapping = dict(zip(users_cols, users_cols_new_names))

    time_cols = ["start_time", "hour", "day", "week", "month", "year", "weekday"]
    time_df_ = dt.select(*time_cols)
    time_df = time_df_.distinct() # 6813

    users_df_ = dt.select(*users_cols)
    users_df_ = users_df_.select([F.col(c).alias(name_mapping.get(c, c)) for c in users_df_.columns])
    users_df = users_df_.distinct() # 784
    
    songplays_proto_cols = ["song", "artist", "start_time","userId","level", "sessionId", "location", "userAgent"]
    songplays_proto_cols_new = ["song", "artist", "start_time", "user_id", "level", "session_id", "location", "user_agent"]
    name_mapping2 = dict(zip(songplays_proto_cols, songplays_proto_cols_new))
    
    songplays_proto_df_ = dt.select(*songplays_proto_cols)
    songplays_proto_df = songplays_proto_df_.select([F.col(c).alias(name_mapping2.get(c, c)) for c in songplays_proto_df_.columns])
    
    return time_df, users_df, songplays_proto_df

In [7]:
%%time
time_df, users_df, songplays_proto_df = transform_events(df_events_raw)

CPU times: user 35.4 ms, sys: 5.03 ms, total: 40.4 ms
Wall time: 1.98 s


In [8]:
!aws s3 ls
!aws configure get region

2021-05-03 18:03:41 aws-emr-resources-365869683794-us-east-2
2021-05-17 13:37:25 course-data-lake-proj
2021-05-17 13:41:04 course-datalake-prj
2021-04-01 15:16:17 my-dataengaws-course-bucket


In [9]:
!aws s3 ls s3://course-datalake-prj/

                           PRE bootstrap/
                           PRE emr-logs/
                           PRE log_data/
                           PRE song_data/


In [None]:
#arn:aws:s3:::aws-emr-resources-365869683794-us-east-2
#s3://my-dataengaws-course-bucket/bootstraps/

In [10]:
%%time
'''
CPU times: user 38.2 ms, sys: 3.47 ms, total: 41.7 ms
Wall time: 3min 44s
'''
# does NOT work............
#df_songs_raw = spark.read.json("s3a://udacity-dend/song_data/A/*/*/*.json") takes too long time to execute
#df_songs_raw = spark.sparkContext.broadcast(spark.read.json("s3a://udacity-dend/song_data/A/B/*/*.json"))

#df_songs_raw = spark.read.json("s3a://course-datalake-prj/song_data/A/A/*/*.json")# 604

#https://course-datalake-prj.s3.us-east-2.amazonaws.com/song_data/A/A/A/TRAAAAK128F9318786.json
#s3://course-datalake-prj/song_data/A/A/A/
df_songs_raw = spark.read.json("s3a://udacity-dend/song_data/A/A/*/*.json")# 604

CPU times: user 38.4 ms, sys: 3.51 ms, total: 42 ms
Wall time: 3min 52s


In [11]:
def transform_songs(df_songs_raw):

    songs_cols = ["song_id", "title", "artist_id", "year", "duration"]
    songs_df_ = df_songs_raw.select(*songs_cols)
    songs_df = songs_df_.distinct() # in the subset 604

    artists_cols = ["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]
    artists_cols_new = ["artist_id", "name", "location", "lattitude", "longitude"]

    name_mapping = dict(zip(artists_cols, artists_cols_new))
    artists_df_ = df_songs_raw.select(*artists_cols)
    artists_df_ = artists_df_.select([F.col(c).alias(name_mapping.get(c, c)) for c in artists_df_.columns])
    artists_df = artists_df_.distinct() # in the subset 591

    songplays_proto2_cols = ["artist_name", "title", "artist_id", "song_id"]

    songplays_proto2_df_ = df_songs_raw.select(*songplays_proto2_cols)
    songplays_proto2_df = songplays_proto2_df_.distinct() # in the subset 604
    return songs_df, artists_df, songplays_proto2_df

In [12]:
%%time
songs_df, artists_df, songplays_proto2_df = transform_songs(df_songs_raw)

CPU times: user 14.2 ms, sys: 741 µs, total: 15 ms
Wall time: 160 ms


In [13]:
joined_df = songplays_proto_df.join(songplays_proto2_df).where(songplays_proto_df.song.eqNullSafe(songplays_proto2_df.title))\
.where(songplays_proto_df.artist.eqNullSafe(songplays_proto2_df.artist_name)) # in the subset 16

joined_dfp = joined_df.withColumn("songplay_id", F.monotonically_increasing_id())

songplays_cols = ["songplay_id", "start_time", "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent"]
songplays_df = joined_dfp.select(*songplays_cols)

In [None]:
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#joined_dfp.limit(5).toPandas()
#joined_dfp.printSchema()

In [14]:
# df.write.partitionBy("year","month").mode("overwrite").parquet("/tmp/output/songplays.parquet")
songplays_df = songplays_df.withColumn("year", F.year("start_time"))\
.withColumn("month", F.month("start_time"))


In [None]:
songplays_df.write.partitionBy("year", "month").mode("overwrite").parquet(
    "/home/workspace/analytics/songplays/songplays.parquet")

songs_df.write.partitionBy("year", "artist_id").mode("overwrite").parquet(
    "/home/workspace/analytics/songs/songs.parquet")

time_df.write.partitionBy("year", "month").mode("overwrite").parquet(
    "/home/workspace/analytics/time/time.parquet")

users_df.write.mode("overwrite").parquet(
    "/home/workspace/analytics/users/users.parquet")

artists_df.write.mode("overwrite").parquet(
    "/home/workspace/analytics/artists/artists.parquet")

In [None]:
#!aws s3 cp /home/workspace/analytics/ s3://course-datalake-prj/analytics/ --recursive