In [30]:
## Map Function in Spark
import pyspark
sc = pyspark.SparkContext(appName="myfirsttry") #Sparkcontext is the entry point for spark environment.
                                                #For every sparkapp you need to create the sparkcontext object.
data = ['myfirstry', 'willrock', 'mylittlebigfatworld']
distributed_data = sc.parallelize(data) 
distributed_data.map(lambda x:x.upper()).collect()

['MYFIRSTRY', 'WILLROCK', 'MYLITTLEBIGFATWORLD']

In [31]:
sc.stop()

In [32]:
#SparkConf and SparkSession
#Sparkconf is the class which gives you the various option to provide configuration parameters.
# Val Conf = new sparkConf().setMaster(“local[*]”).setAppName(“test”)
# Val SC  = new sparkContext(Conf) #spark configuration is passed to spark context. 
from pyspark import SparkConf

In [33]:
# SparkSession is an entry point to Spark and creating a SparkSession instance would be the 
# first statement you would write to program with RDD, 
# DataFrame and Dataset. SparkSession will be created using SparkSession.builder() builder patterns.
from pyspark.sql import SparkSession

In [34]:
spark = SparkSession.builder.appName("myfirstapp").getOrCreate()

In [None]:
spark.sparkContext.getConf().getAll()

In [36]:
spark

In [37]:
## Creating Spark DataFrame
path = "data/sparkify_log_small.json"

In [38]:
df = spark.read.json(path)

In [39]:
df

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [43]:
df.take(1)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')]

In [44]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [46]:
df.show(n=1)

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+------

In [47]:
df.select("artist").show()

+--------------------+
|              artist|
+--------------------+
|       Showaddywaddy|
|          Lily Allen|
|Cobra Starship Fe...|
|          Alex Smoke|
|                null|
|                null|
|              Redman|
|     Ulrich Schnauss|
|                null|
|                null|
|               Jay-Z|
|         Evanescence|
|     Scissor Sisters|
|        3 Doors Down|
|       George Younce|
|              Aly-Us|
|                null|
|            BjÃÂ¶rk|
|      David Bromberg|
|          Nickelback|
+--------------------+
only showing top 20 rows



In [None]:
df.count()

In [None]:
df.describe("artist").show()

In [None]:
df = df.dropna('how'=any, subset=['sessionid','userid'])

In [None]:
df.select("page").dropDuplicates().sort("page").show()

In [49]:
spark.stop()

In [55]:
### Starting Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mytinytinyapp').getOrCreate()
path = "data/sparkify_log_small.json"
df = spark.read.json(path)

In [80]:
## Use of UDF ( User Defined Functions)
from pyspark.sql.functions import udf
import datetime
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x/1000.0). hour)
df1 = df.withColumn("hour", get_hour(df.ts))
df1.head()

In [97]:
## Use of Filter, groupby, count, order by in one query
songs_per_hour = df1.filter(df1.page=='NextSong').groupby('hour').count().orderBy(df1.hour.cast("float"))
df2=songs_per_hour.toPandas()

In [124]:
df1.select(['userId', 'sessionid', 'song']).where(df1.userId=='1138')

DataFrame[userId: string, sessionid: bigint, song: string]

In [119]:
## Use of Window functions
from pyspark.sql import Window
from pyspark.sql.functions import desc
from pyspark.sql.functions import count
screen = Window.partitionBy("userId").orderBy(desc('ts')).rangeBetween(Window.unboundedPreceding, 0)
df1 = df1.withColumn('newcolumn', count('song').over(screen))

In [136]:
### Question 1 : Which page did user id "" (empty string) NOT visit?
pages = df.select("page").dropDuplicates()
visited = df.select("page").dropDuplicates().where(df.userId == "")
for row in set(pages.collect())-set(visited.collect()):
    print(row)

Row(page='Error')
Row(page='About')
Row(page='NextSong')
Row(page='Downgrade')
Row(page='Logout')
Row(page='Settings')
Row(page='Submit Downgrade')
Row(page='Help')
Row(page='Upgrade')
Row(page='Save Settings')
Row(page='Submit Upgrade')
Row(page='Home')


In [137]:
## Question 2: What type of user does the empty string user id most likely refer to?
visited.show()

+-----+
| page|
+-----+
|Login|
+-----+



In [144]:
## Question 3: How many female users do we have in the data set?
df.filter(df.gender=='F').select('userId', 'sessionId').dropDuplicates().count()

675

In [153]:
## Question 4: How many songs were played from the most played artist?
df.filter(df.page=="NextSong").select('artist').groupBy('artist').agg({'artist':'count'})\
    .withColumnRenamed('count(Artist)', 'acount').sort(desc('acount')).show(1)

+--------+------+
|  artist|acount|
+--------+------+
|Coldplay|    83|
+--------+------+
only showing top 1 row



In [None]:
##How many songs do users listen to on average between visiting our home page? 
##Please round your answer to the closest integer.