In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, DateType, LongType, StructField, TimestampType
from pyspark.sql.functions import udf, col, unix_timestamp, to_utc_timestamp, from_unixtime, to_date, dayofweek, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, weekofyear

In [2]:
spark = SparkSession \
    .builder \
    .appName("Practice_for_a_notebook") \
    .getOrCreate()

In [3]:
log_data = spark.read.json("data/log-data/*.json" )
# i have to cast the data types
# _c means casted
log_data = log_data.withColumn("itemInSession_c", log_data["itemInSession"].cast(IntegerType())).drop("itemInSession")
log_data = log_data.withColumn("sessionId_c", log_data["sessionId"].cast(IntegerType())).drop("sessionId")
log_data = log_data.withColumn("status_c", log_data["status"].cast(IntegerType())).drop("status")


log_data = log_data.withColumn("as_date_C", to_utc_timestamp(from_unixtime(col("ts")/1000,'yyyy-MM-dd HH:mm:ss'),'EST')).drop("ts")


#df = log_data.withColumn("timestamp",unix_timestamp(log_data.ts, 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()))
log_data = log_data.withColumn("userId_C", log_data["userId"].cast(IntegerType())).drop("userId")

In [4]:
# before reading the file i had to specify my schema (explicit schema definintion)

song_types = StructType([
    StructField("artist_id",StringType()),
    StructField("artist_latitude",StringType()),
    StructField("artist_location",StringType()),
    StructField("artist_longitude",StringType()),
    StructField("artist_name",StringType()),
    StructField("duration",DoubleType()),
    StructField("num_songs",IntegerType()),
    StructField("song_id",StringType()),
    StructField("title",StringType()),
    StructField("year",IntegerType())
])

song_data = spark.read.json("data/song-data/song_data/*/*/*/*.json", schema = song_types)

# All Log Data

In [5]:
log_data.limit(100).toPandas().head(10)#.sort_values(by = "song").head(5)
#log_data.filter(log_data.song == "Floating").show()#.toPandas().head()#.loc[1]['song']

Unnamed: 0,artist,auth,firstName,gender,lastName,length,level,location,method,page,registration,song,userAgent,itemInSession_c,sessionId_c,status_c,as_date_C,userId_C
0,Harmonia,Logged In,Ryan,M,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,Sehr kosmisch,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",0,583,200,2018-11-15 05:30:26,26.0
1,The Prodigy,Logged In,Ryan,M,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,The Big Gundown,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",1,583,200,2018-11-15 05:41:21,26.0
2,Train,Logged In,Ryan,M,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,Marry Me,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2,583,200,2018-11-15 05:45:41,26.0
3,,Logged In,Wyatt,M,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,0,563,200,2018-11-15 06:57:51,9.0
4,,Logged In,Austin,M,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,0,521,200,2018-11-15 08:29:37,12.0
5,Sony Wonder,Logged In,Samuel,M,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,Blackbird,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",0,597,200,2018-11-15 08:44:09,61.0
6,,Logged In,Samuel,M,Gonzalez,,free,"Houston-The Woodlands-Sugar Land, TX",GET,About,1540493000000.0,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1,597,200,2018-11-15 08:44:20,61.0
7,,Logged Out,,,,,paid,,PUT,Login,,,,0,602,307,2018-11-15 10:34:34,
8,,Logged In,Tegan,F,Levine,,paid,"Portland-South Portland, ME",GET,Home,1540794000000.0,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1,602,200,2018-11-15 10:37:57,80.0
9,Van Halen,Logged In,Tegan,F,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,Best Of Both Worlds (Remastered Album Version),"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2,602,200,2018-11-15 10:48:55,80.0


# All song Data

In [6]:
song_data.limit(71).toPandas().head(5)#.sort_values(by = "title").head(20)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


# User Tbale

In [7]:
user_table_column_list = ["userId_C","firstName","lastName","gender","level"]
user_table_df = log_data.select(*user_table_column_list).distinct()
user_table_df.show(2)

+--------+---------+--------+------+-----+
|userId_C|firstName|lastName|gender|level|
+--------+---------+--------+------+-----+
|      20|    Aiden| Ramirez|     M| paid|
|       4|   Alivia| Terrell|     F| free|
+--------+---------+--------+------+-----+
only showing top 2 rows



# Song Table

In [8]:
song_table_column_list = ["song_id","title","artist_id","year","duration"]
song_table_df = song_data.select(*song_table_column_list).distinct()
song_table_df.count()
song_table_df.orderBy(song_table_df.title).limit(10).show(5)
#song_table_df.select(song_table_df.title).limit(12).orderBy(song_table_df.title).show(70,truncate = False)
#song_table_df.filter(song_table_df.title == "The Big Gundown").show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOFFKZS12AB017F194|A Higher Place (A...|ARBEBBY1187B9B43DB|1994|236.17261|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00771|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SORAMLE12AB017C8B0|      Auguri Cha Cha|ARHHO3O1187B989413|   0|191.84281|
|SOZHPGD12A8C1394FE|     Baby Come To Me|AR9AWNF1187B9AB0B4|   0|236.93016|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



# artist_table

In [9]:
artist_table_column_list = ["artist_id","artist_name","artist_location","artist_longitude","artist_latitude"]
artist_table_df = song_data.select(*artist_table_column_list).distinct()
artist_table_df.show(2)

+------------------+---------------+---------------+----------------+---------------+
|         artist_id|    artist_name|artist_location|artist_longitude|artist_latitude|
+------------------+---------------+---------------+----------------+---------------+
|ARAJPHH1187FB5566A|The Shangri-Las|     Queens, NY|       -73.83168|        40.7038|
|AR0IAWL1187B9A96D0|   Danilo Perez|         Panama|       -80.11278|         8.4177|
+------------------+---------------+---------------+----------------+---------------+
only showing top 2 rows



# Time table

In [10]:
time_table = log_data.\
withColumn("star_time", date_format(log_data.as_date_C,"HH:mm:ss")).\
withColumn('hour',date_format(log_data.as_date_C ,"h")).\
withColumn('day',date_format(log_data.as_date_C ,"d")).\
withColumn('week',weekofyear(log_data.as_date_C)).\
withColumn('month',month(log_data.as_date_C)).\
withColumn('year',year(log_data.as_date_C)).\
withColumn('week_day',dayofweek(log_data.as_date_C)).\
select("star_time","hour","day","week","month","year","week_day")
time_table.show(2)

+---------+----+---+----+-----+----+--------+
|star_time|hour|day|week|month|year|week_day|
+---------+----+---+----+-----+----+--------+
| 05:30:26|   5| 15|  46|   11|2018|       5|
| 05:41:21|   5| 15|  46|   11|2018|       5|
+---------+----+---+----+-----+----+--------+
only showing top 2 rows



In [11]:
#To create the song play table i will use spark SQl
log_data.createOrReplaceTempView("log_table")
song_table_df.createOrReplaceTempView("song_table")
#time_table.createOrReplaceTempView("time_table")

#song_play

In [12]:
# first we need to define a udf function to do the join job need for the songplay table
# for simplicity i will make a dictionary with key that equals to the song name and it's values will be the song_id and artist_id
import time
list_of_song_id = []
#unique = set()
filter_song_table_df  = song_table_df.select("song_id","artist_id","title")
def joiner(song_name):
    """THIS FUNCTION SEARCHES FOR A SONG_NAME IN THE SONG_DATA
        AND RETURNS THE SONG_ID , ARTIST_ID TO APPEND THEM TO A LIST THAT CONTAINS THE SOND_ID AND ARTIST_ID
    """
    list_of_song_id.append(filter_song_table_df.filter(filter_song_table_df.title == "{}".format(song_name)).\
                           select("song_id","artist_id").toPandas().values.tolist())

#t0 = time.time()
songs_from_log_data = [ x for x in log_data.rdd.map(lambda x: x[11]).collect() if x != None]
#print("after list comprehension" , time.time()-t0)
print(len(songs_from_log_data))
c = time.time()
counter =0
for i in songs_from_log_data:
    if counter >100:
        break
    else :
        joiner(i)
        counter +=1


print(time.time()-c)

# i may use the song_table directly to get the song_id and artist_id but i will 
#make the song dictionary so that whenever i find the key i delete it 


6820


KeyboardInterrupt: 

In [None]:
song_name = "A Higher Place (Album Version)"
song_table_df.filter(song_table_df.title == "{}".format(song_name)).select("song_id","artist_id").toPandas().values.tolist()

In [13]:
song_play_column_list = ["as_date_C","userId_C","level","sessionId_c","location","userAgent"]



songplay_table = log_data.select(*song_play_column_list)


# casting the time stamp to be time only
songplay_table = songplay_table.withColumn("star_time", date_format(songplay_table.as_date_C,"HH:mm:ss")).drop(songplay_table.as_date_C)

songplay_table = songplay_table.withColumn("song_id", (song_table_df.as_date_C,"HH:mm:ss")).drop(songplay_table.as_date_C)

songplay_table.show(2)



#songplay_table = log_data.\
#withColumn("start_time", date_format(log_data.as_date_C , "HH:mm:ss")).\
#withColumn("user_id", log_data.userId_C).\


AttributeError: 'DataFrame' object has no attribute 'as_date_C'

In [None]:
spark.sql("""SELECT star_time FROM time_table t JOIN DEPT d ON e.emp_dept_id == d.dept_id""") \
.show(truncate=False)

In [91]:
log_data.select(log_data.song).toPandas().values.tolist()[:10]

[['Sehr kosmisch'],
 ['The Big Gundown'],
 ['Marry Me'],
 [None],
 [None],
 ['Blackbird'],
 [None],
 [None],
 [None],
 ['Best Of Both Worlds (Remastered Album Version)']]

In [13]:
b_tolist=log_data.rdd.map(lambda x: x[0]).collect()
type(b_tolist)

list

In [18]:
b_tolist[:10]
#print(len(b_tolist))

['Harmonia',
 'The Prodigy',
 'Train',
 None,
 None,
 'Sony Wonder',
 None,
 None,
 None,
 'Van Halen']

In [36]:
w = [None , "one","two"]
y = [ f for f in w if f != None]
y

['one', 'two']

# trying join

In [14]:
# apply inner join 
joined_df = log_data.join(song_data, log_data.artist == song_data.artist_name, 'inner')

# selecting columns from the joined dataframe
song_play_column_list = ["songplay_id","as_date_C","userId_C","level","song_id","artist_id","sessionId_c","location","userAgent"]

#adding  a column for the songplay id column using a built in function (monotonically_increasing_id)
joined_df = joined_df.withColumn("songplay_id",(monotonically_increasing_id()+1))

#appying the select statement to pick up only considered columns
joined_df = joined_df.select(*song_play_column_list)

#extracting the time from the timestamp column (as_data_C) and dropping the old column as_data_C
joined_df = joined_df.withColumn("star_time", date_format(joined_df.as_date_C,"HH:mm:ss")).drop("as_date_C")


# renaming columns
joined_df = joined_df.withColumnRenamed("userId_C","user_id").\
withColumnRenamed("sessionId_c","session_id")


# rearranging column positions
song_play_column_list = ["songplay_id","star_time","user_id","level","song_id","artist_id","session_id","location","userAgent"]


joined_df = joined_df.select(*song_play_column_list).show(1)



#joined_df.show(1)

#joined_df.select("id").show(5)

+-----------+---------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|star_time|user_id|level|           song_id|         artist_id|session_id|            location|           userAgent|
+-----------+---------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          1| 01:32:47|     44| paid|SOBONFF12A6D4F84D8|ARIK43K1187B9AE54C|       619|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|
+-----------+---------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
only showing top 1 row



In [16]:
spark.stop()

In [17]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [20]:
spark.stop()