<a href="https://colab.research.google.com/github/Kondwani7/Pyspark-basics/blob/main/pyspark_sql.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=64cb5964124c5064b6c30a51aa046714037d0396358bc2cff7546957a9774826
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount= True)

Mounted at /content/drive


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [5]:
spark = SparkSession \
      .builder \
      .appName('SQL spark data wrangling') \
      .getOrCreate()


In [6]:
path = '/content/drive/MyDrive/practice datasets/sparkify_log_small.json'
user_log = spark.read.json(path)

In [7]:
user_log.take(5)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046'),
 Row(artist='Lily Allen', auth='Logged In', firstName='Elizabeth', gender='F', itemInSession=7, lastName='Chase', length=195.23873, level='free', location='Shreveport-Bossier City, LA', method='PUT', page='NextSong', registration=1512718541284, sessionId=5027, song='Cheryl Tweedy', status=200, ts=1513720878284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000'),
 Row(artist='Cobra Starship Featuring Leighton Meester', auth='Logged In', firstNa

In [8]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [10]:
#create a temp view that will be used to run sql queries
user_log.createOrReplaceTempView("user_log_table")
#SQL query
spark.sql("SELECT * FROM user_log_table LIMIT 5").show()


+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       Showaddywaddy|Logged In|  Kenneth|     M|          112| Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|          Lily Allen|Logged In|Elizabeth|     F|            7|    Chase|195.23873| free|Shreveport-Bossie...|   PUT

sample queries

In [21]:
#get first 5 queries
spark.sql("SELECT artist, auth FROM user_log_table LIMIT 5").show()


+--------------------+---------+
|              artist|     auth|
+--------------------+---------+
|       Showaddywaddy|Logged In|
|          Lily Allen|Logged In|
|Cobra Starship Fe...|Logged In|
|          Alex Smoke|Logged In|
|                null|Logged In|
+--------------------+---------+



In [22]:
#get number of artists
spark.sql("SELECT COUNT(artist) FROM user_log_table").show()


+-------------+
|count(artist)|
+-------------+
|         8347|
+-------------+



In [24]:
#get number of users
spark.sql("SELECT COUNT(userID) FROM user_log_table").show()

+-------------+
|count(userID)|
+-------------+
|        10000|
+-------------+



In [25]:
#get daa on a specific user
spark.sql("SELECT firstname, lastName, level, gender, song FROM user_log_table WHERE userId=='1046'").collect()

[Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='Christmas Tears Will Fall'),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='Be Wary Of A Woman'),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='Public Enemy No.1'),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='Reign Of The Tyrants'),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='Father And Son'),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='No. 5'),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='Seventeen'),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song=None),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='War on war'),
 Row(firstname='Kenneth', lastName='Matthews', level='paid', gender='M', song='Killermont Street'),
 Row(firstname='Kenneth', lastNam

In [26]:
#get the different pages of the company website
spark.sql('''
          SELECT DISTINCT page 
          FROM user_log_table
          ORDER BY page ASC
          ''').show()



+----------------+
|            page|
+----------------+
|           About|
|       Downgrade|
|           Error|
|            Help|
|            Home|
|           Login|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



#User Defined Functions

In [28]:
#mapping the songs listen to at each time stamp in hours
spark.udf.register("get_hour", lambda x : int(datetime.datetime.fromtimestamp(x / 1000.0).hour))


<function __main__.<lambda>>

In [31]:
#songs listen to at each time stamp in hours
spark.sql('''
          SELECT *, get_hour(ts) AS hour
          FROM user_log_table
          LIMIT 5
          ''').show()



+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+----+
|              artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|hour|
+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+----+
|       Showaddywaddy|Logged In|  Kenneth|     M|          112| Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|  22|
|          Lily Allen|Logged In|Elizabeth|     F|            7|    Chase|195.23873| free|Shrevep

In [34]:
#get songs played at each hour
songs_in_hour = spark.sql('''
                          SELECT get_hour(ts) AS hour, COUNT(*) as plays_per_hour
                          FROM user_log_table
                          WHERE page = "NextSong"
                          GROUP BY hour
                          ORDER BY cast(hour as int) ASC
                          ''')



In [35]:
songs_in_hour.show()

+----+--------------+
|hour|plays_per_hour|
+----+--------------+
|   0|           456|
|   1|           454|
|   2|           382|
|   3|           302|
|   4|           352|
|   5|           276|
|   6|           348|
|   7|           358|
|   8|           375|
|   9|           249|
|  10|           216|
|  11|           228|
|  12|           251|
|  13|           339|
|  14|           462|
|  15|           479|
|  16|           484|
|  17|           430|
|  18|           362|
|  19|           295|
+----+--------------+
only showing top 20 rows



In [41]:
#convert to a pandas dataframe
songs_in_hour_pd = songs_in_hour.toPandas()
print(songs_in_hour_pd)

   hour  plays_per_hour
0     0             456
1     1             454
2     2             382
3     3             302
4     4             352
5     5             276
6     6             348
7     7             358
8     8             375
9     9             249
10   10             216
11   11             228
12   12             251
13   13             339
14   14             462
15   15             479
16   16             484
17   17             430
18   18             362
19   19             295
20   20             257
21   21             248
22   22             369
23   23             375


In [45]:
peak_plays = max(songs_in_hour_pd['plays_per_hour'])
print(peak_plays)

484


In [47]:
#number of female users
spark.sql('''
          SELECT COUNT(DISTINCT userID) 
          FROM user_log_table
          WHERE gender = 'F'
          ''').show()


+----------------------+
|count(DISTINCT userID)|
+----------------------+
|                   462|
+----------------------+



In [49]:
#how many songs where played by the top 5 favorite artists by song plays
spark.sql('''
          SELECT Artist, COUNT(Artist) AS plays
          FROM user_log_table
          GROUP BY Artist
          ORDER BY plays DESC
          LIMIT 5
          ''').show()



+--------------------+-----+
|              Artist|plays|
+--------------------+-----+
|            Coldplay|   83|
|       Kings Of Leon|   69|
|Florence + The Ma...|   52|
|            BjÃÂ¶rk|   46|
|       Dwight Yoakam|   45|
+--------------------+-----+

