In [1]:
from pyspark.sql import SparkSession
import os
import configparser
import pyspark.sql.functions as F
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('../dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"] =     config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']
# os.environ["AWS_SESSION_TOKEN"] =     config['AWS']['AWS_SESSION_TOKEN']
# os.environ['AWS_DEFAULT_REGION'] =    "us-east-1"#config['AWS']['AWS_DEFAULT_REGION']

In [2]:
spark = SparkSession.builder\
                     .master("spark://pop-os.localdomain:7077")\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.3")\
                     .getOrCreate()
sc = spark.sparkContext
config = sc._jsc.hadoopConfiguration()
config.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com")

In [5]:
bucket_path = "s3a://myawsbucket4321a"

In [6]:
songplays_df = spark.read.parquet(bucket_path + "/songplays.parquet").alias("songplays")

AnalysisException: 'Unable to infer schema for Parquet. It must be specified manually.;'

In [7]:
df_events = spark.read.json("/home/paul/Projects/DataEngineering/Spark/data/log_data/*.json")
df_events = df_events = df_events.withColumn("timestamp", F.to_timestamp(F.from_unixtime(F.col("ts") / 1000)) )
df_events.createOrReplaceTempView("staging_events")

In [10]:
df_songs = spark.read.json("/home/paul/Projects/DataEngineering/Spark/data/song_data/*/*/*/*.json")
df_songs.createOrReplaceTempView("staging_songs")

In [11]:
df_events.count()

8056

In [12]:
df_songs.count()

71

In [14]:
# spark.sql("select * from staging_events").limit(5).toPandas()
df_events = df_events = df_events.withColumn("timestamp", F.to_timestamp(F.from_unixtime(F.col("ts") / 1000)) )
df_events.createOrReplaceTempView("staging_events")

8056

In [17]:
df_events.limit(5).show()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|          timestamp|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+-------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-14 19:30:26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|S

In [11]:
df_events.write.parquet("s3a://myawsbucket4321a/users.parquet",mode="overwrite")

In [8]:
df_events.count()

8056

In [9]:
df_songs.count()

1

In [None]:
df_songs = spark.read.json("s3a://udacity-dend/song_data/*/*/*/*.json")

In [None]:
df_songs.count()

In [None]:
df_events.where(F.col("page") == "NextSong").sort(F.col("song").asc()).limit(100).limit(5).toPandas()

In [None]:
df_songs.count()

In [None]:
spark.sql("select * from staging_songs").limit(5).toPandas()

In [None]:
from pyspark.sql.types import *


songplays_schema = StructType(
    [
        StructField("songplay_id", StringType(), True),
        StructField("start_time", DoubleType(), True),
        StructField("user_id", StringType(), True),
        StructField("level", StringType(), True),
        StructField("song_id", StringType(), True),
        StructField("artist_id", StringType(), True),
        StructField("session_id", StringType(), True),
        StructField("location", StringType(), True),
        StructField("user_agent", StringType(), True)
    ]
)
df_songplays = spark.createDataFrame([], songplays_schema)
df_songplays.createOrReplaceTempView("songplays")


users_schema = StructType(
    [
        StructField("user_id", StringType(), True),
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("level", StringType(), True)
    ]
)
df_users = spark.createDataFrame([], songplays_schema)
df_users.createOrReplaceTempView("users")


songs_schema = StructType(
    [
        StructField("song_id", StringType(), True),
        StructField("title", StringType(), True),
        StructField("artist_id", StringType(), True),
        StructField("year", ShortType(), True),
        StructField("duration", DoubleType(), True)
    ]
)
df_songs = spark.createDataFrame([], songs_schema)
df_songs.createOrReplaceTempView("songs")


artists_schema = StructType(
    [
        StructField("artist_id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("location", StringType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True)
    ]
)
df_artists = spark.createDataFrame([], artists_schema)
df_artists.createOrReplaceTempView("artists")


time_schema = StructType(
    [
        StructField("start_time", TimestampType(), True),
        StructField("hour", IntegerType(), True),
        StructField("day", IntegerType(), True),
        StructField("month", IntegerType(), True),
        StructField("year", ShortType(), True),
        StructField("weekend", BooleanType(), True)
    ]
)
df_time = spark.createDataFrame([], time_schema)
df_time.createOrReplaceTempView("artists")



In [None]:
sub_frame = df_events[[
    "itemInSession", 
    "ts", 
    "userId", 
    "level",
    "artist",
    "song",
    "sessionId", 
    "location", 
    "userAgent"
]]# .limit(5).toPandas()

In [None]:
df_events[["ts"]]

In [None]:
new_frame = df_songplays.union(sub_frame)

In [None]:
new_frame.limit(5).toPandas()

In [None]:
spark.sql("select * from songs").toPandas()

## song table

In [None]:
spark.sql("""
SELECT DISTINCT(song_id), title, artist_id, year, duration 
    FROM staging_songs
""").distinct().toPandas()

## artist table

In [None]:
spark.sql("""
SELECT DISTINCT(artist_id), artist_name, artist_location, artist_latitude, artist_longitude
    FROM staging_songs
""").toPandas()

## user table

In [None]:
spark.sql("""
SELECT DISTINCT(userId) as user_id, firstName, lastName, gender, level
    FROM staging_events
    WHERE page ='NextSong'
""").toPandas()

## time table

In [None]:
added_timestamp = df_events.withColumn("timestamp", F.to_timestamp(F.from_unixtime(F.col("ts") / 1000)) )
added_timestamp.createOrReplaceTempView("staging_events")

In [None]:
def check_weekday(x):
    if x == 7 or x == 1:
        return False
    else:
        return True
import datetime
added_timestamp.select(
    F.col("timestamp").alias('start_time'),
    F.hour("timestamp").alias('hour'),
    F.dayofmonth("timestamp").alias('day'),
    F.weekofyear("timestamp").alias('week'),
    F.month("timestamp").alias('month'), 
    F.year("timestamp").alias('year'), 
    # F.dayofweek("timestamp").alias('day of week'),
    F.when(  F.dayofweek("timestamp") ==  7 , False).when(  F.dayofweek("timestamp") ==  1 , False).otherwise(True).alias("weekday")
).limit(5).toPandas()# .where(added_timestamp.timestamp > datetime.datetime(2018, 11, 5 )).show()

In [None]:
data=[["-1","2018-11-03"], ["0","2018-11-04"], ["1","2018-11-05"],["2","2018-11-06"],["3","2018-11-07"]]
df=spark.createDataFrame(data,["id","input"])
df = df.withColumn("timestamp", F.to_date(F.col("input"), "yyyy-MM-dd") )
df = df.withColumn("day_of_week", F.dayofweek(F.col("timestamp") ) )
df.show()

In [None]:
# df.select(F.col("input"), 
#     F.to_date(F.col("input"), "yyyy-MM-dd").alias("date_format"),
#     F.dayofweek("date_format").alias('day of week')
#   ).show()

In [None]:
lambda x : True if (x == 7 or x == 1) else False

In [None]:
df.toPandas()

In [None]:
# spark.sql("""
# SELECT timestamp FROM staging_events as ts,
#     EXTRACT(hour FROM staged_date)
# """).limit(5).toPandas()

In [None]:
# df_events.select(F.to_timestamp(F.from_unixtime( F.col("ts") / 1000 )).alias('time_stamp'))
# df_songs = spark.read.json("s3a://udacity-dend/song_data/A/B/Q/TRABQTA128F148D048.json")
#added_timestamp = df_events.withColumn("unix_timestamp", F.col("unix_timestamp_int").cast(TimestampType()))

In [None]:
df_events.printSchema()

In [None]:
df_events[["userId", "level", "song"]].toPandas()

## songplay table 

In [None]:
spark.sql("""
SELECT st.userId,
    st.level,
    s.song_id,
    s.artist_id,
    st.sessionId,
    st.location,
    st.userAgent
FROM staging_events st 
INNER JOIN staging_songs s ON s.title=st.song AND st.artist = s.artist_name
WHERE st.page = 'NextSong'
""").toPandas()

In [None]:
df_events[["userId", "level", "song_id", "artist_id", "sessionId", "location", "userAgent"]]

In [None]:
df = spark.sql("""
SELECT *
FROM staging_events
WHERE page='NextSong'
""").toPandas()

In [None]:
songs_final_df = spark.sql("""
SELECT *
FROM staging_events
""").toPandas()

In [None]:
df_events.limit(5).toPandas()

In [None]:
df_events.toJSON("./test.out")

In [None]:
df_events.coalesce(1).write.format('json').save('./test.out')

In [None]:
df_events.write.parquet("s3a://paul-spark/songs.parquet",mode="overwrite")