<h2>SQL Problem</h2>

**Corrected SQL**

SELECT Levels.Level_Name, Players.Rank, COUNT(Players.Rank) AS num_players<br>
FROM Players<br> 
LEFT JOIN Levels ON Levels.Level_ID = Players.Level_ID<br>
GROUP BY Levels.Level_Name, Players.Rank;

Changes made:
- Only selected relevant column names. 
- Added table references before column names to ensure correct table data pulled.
- Added rank to groupby and corrected level name typing error.
- Specified the join type required.


<h2>Linux Command Problem</h2>

***for a in yes | nl | head -50 | cut -f 1; do<br>
head -$(($a*2)) inputfile | tail -1<br>
awk 'BEGIN{FS="\t"}{print $2}'<br>
xargs wget -c 2> /dev/null***


- '|' allow multiple procedures to be stringed together
- 'for a in yes' will infinitely produce string y, added as a new record to a file by 'n1'
- 'head -50 ' prints 50 lines from start of y's
- 'cut -f 1' cuts the first field from each record
- 'head -$(($a*2))' for each line a, get a two lines from inputfile start.
- 'tail -1' prints last line
- 'awk 'BEGIN{FS="\t"}' scans file line by line, identifying columns separated by tabspaces.
- 'awk{print $2;}' prints the second field.
- 'xargs' converts previous inputs into arguments.
- 'wget -c' will download a file from the web. The '-c' option allows the download of a previous partial/interrupted file to continue.
- '2> /dev/null' means standard errors (2>) will be sent to /dev/null, deleting the error message.

1. First, 50 lines of yes will be created using the first line.
2. Then, for each line a, line a*2 is extracted from the start of the document, followed by the last line.
3. The file is scanned line by line, identifying fields using tabspace as a delimiter.
4. The second column is extracted, printed, then used as arguments for wget.
5. If the file already exists partially downloaded, the download will resume, or download starts from scratch for new file.
6. Errors are ignored for this procedure.

**Simplified Linux Command**

- Change the first line to just get the first 50 lines from the head of the input file: head -50 inputfile
- Chain with the awk command to get the second column: | awk 'BEGIN{FS="\t"}{print $2}'
- Use arguments one by one to download web files: | xargs -n 1 wget -c 2> /dev/null



<h2>PySpark Project</h2>

In [1]:
#Imports used
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql import functions as F
from pyspark.sql.functions import countDistinct
import nest_asyncio
import tarfile
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
import datetime

In [2]:
#nest_asyncio allows pyspark to run error-free in jupyter notebook
#start spark session and set params
nest_asyncio.apply()
sc = SparkSession.builder.appName('PS_Data_Tar2').getOrCreate()

In [3]:
#Read data into Pyspark. I ended up using only the song dataset, but kept the user dataset code for reference
#userid \t timestamp \t musicbrainz-artist-id \t artist-name \t musicbrainz-track-id \t track-name were column names from README
column_headers_songs = ['userid', 'timestamp', 'musicbrainz-artist-id', 'artist-name', 'musicbrainz-track-id', 'track-name']
df_songs = sc.read.option('header', 'false').option('delimiter', '\t').csv('lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv').toDF(*column_headers_songs)
#df_users = sc.read.option('header', 'true').option('delimiter', '\t').csv('lastfm-dataset-1K/userid-profile.tsv')

In [4]:
#Convert timestamp to correct datatype and deal with null values
df_songs_clean = df_songs.fillna('not found')
#df_users_clean = df_users.fillna('unknown')
df_songs_clean = df_songs_clean.withColumn("timestamp", F.to_timestamp("timestamp"))
#df_users_clean = df_users_clean.withColumnRenamed('#id', 'userid')

In [5]:
df_songs_clean.show(20)
#df_users_clean.show(20)

+-----------+-------------------+---------------------+---------------+--------------------+--------------------+
|     userid|          timestamp|musicbrainz-artist-id|    artist-name|musicbrainz-track-id|          track-name|
+-----------+-------------------+---------------------+---------------+--------------------+--------------------+
|user_000001|2009-05-05 00:08:57| f1b1cf71-bd35-4e9...|      Deep Dish|           not found|Fuck Me Im Famous...|
|user_000001|2009-05-04 14:54:10| a7f7df4a-77d8-4f1...|       坂本龍一|           not found|Composition 0919 ...|
|user_000001|2009-05-04 14:52:04| a7f7df4a-77d8-4f1...|       坂本龍一|           not found|Mc2 (Live_2009_4_15)|
|user_000001|2009-05-04 14:42:52| a7f7df4a-77d8-4f1...|       坂本龍一|           not found|Hibari (Live_2009...|
|user_000001|2009-05-04 14:42:11| a7f7df4a-77d8-4f1...|       坂本龍一|           not found|Mc1 (Live_2009_4_15)|
|user_000001|2009-05-04 14:38:31| a7f7df4a-77d8-4f1...|       坂本龍一|           not found|To Stanford (Liv

In [6]:
#count of uniques songs played by each user
df_songs_clean.groupBy('userid').agg(countDistinct('track-name').alias('unique songs played')).show(20)

+-----------+-------------------+
|     userid|unique songs played|
+-----------+-------------------+
|user_000066|                659|
|user_000098|                253|
|user_000113|               2072|
|user_000372|               4684|
|user_000424|               1976|
|user_000577|              16768|
|user_000708|               4615|
|user_000289|                962|
|user_000319|               6126|
|user_000445|               3588|
|user_000794|               5607|
|user_000339|               2490|
|user_000821|               1834|
|user_000171|                431|
|user_000182|               9386|
|user_000465|               1429|
|user_000534|               3823|
|user_000706|               4005|
|user_000801|               2717|
|user_000984|               1618|
+-----------+-------------------+
only showing top 20 rows



In [7]:
#Show the top 100 most played songs
num_times_song_played = df_songs_clean.groupBy('artist-name', 'track-name').count()
num_times_song_played.orderBy('count', ascending=False).limit(100).show(100)

+--------------------+--------------------+-----+
|         artist-name|          track-name|count|
+--------------------+--------------------+-----+
|  The Postal Service|  Such Great Heights| 3992|
|        Boy Division|Love Will Tear Us...| 3663|
|           Radiohead|        Karma Police| 3534|
|                Muse|Supermassive Blac...| 3483|
| Death Cab For Cutie|     Soul Meets Body| 3479|
|           The Knife|          Heartbeats| 3156|
|                Muse|           Starlight| 3060|
|         Arcade Fire|    Rebellion (Lies)| 3048|
|      Britney Spears|          Gimme More| 3004|
|         The Killers| When You Were Young| 2998|
|            Interpol|                Evil| 2989|
|          Kanye West|       Love Lockdown| 2950|
|      Massive Attack|            Teardrop| 2948|
| Death Cab For Cutie|I Will Follow You...| 2947|
|                Muse| Time Is Running Out| 2945|
|          Bloc Party|             Banquet| 2906|
|         Arcade Fire|Neighborhood #1 (...| 2826|


In [8]:
#Below shows songs played during user sessions. Also shows length of sessions in seconds.
#Initiatlise a window to partition data for faster processing of large datasets. Order partitions by timestamps.
window_spec = Window.partitionBy('userid').orderBy('timestamp')
# Calculate the time difference between timestamp rows. 
raw_data_with_time_diff = df_songs_clean.withColumn('time_diff',F.when(F.lag('timestamp', 1).over(window_spec).isNull(), 0).otherwise(F.unix_timestamp('timestamp') - F.unix_timestamp(F.lag('timestamp', 1).over(window_spec))))
# Create new column which identifies which time differences fall between 20 mins of each other
raw_data_with_sessions = raw_data_with_time_diff.withColumn('new_session', (F.col('time_diff') > 20 * 60).cast('int'))
# Calculate session IDs belonging to each user
raw_data_with_sessions = raw_data_with_sessions.withColumn('session_id',F.sum('new_session').over(window_spec))
# Filter out sessions with less than 20 minutes duration. Add list of tracks played in a session
sessions_with_duration = raw_data_with_sessions.groupBy('userid', 'session_id').agg(
    F.collect_list('track-name').alias('tracks_played'),
    F.min('timestamp').alias('start_timestamp'),
    F.max('timestamp').alias('end_timestamp')
    ).withColumn(
    'session_duration',
    (F.unix_timestamp('end_timestamp') - F.unix_timestamp('start_timestamp'))
    ).filter(
    F.col('session_duration') >= 20 * 60
    )
#Create new index to remember ordering of session durations before sort
sessions_with_duration = sessions_with_duration.drop('session_id').withColumn('session_id', F.monotonically_increasing_id())
sessions_with_duration.show()

+-----------+--------------------+-------------------+-------------------+----------------+----------+
|     userid|       tracks_played|    start_timestamp|      end_timestamp|session_duration|session_id|
+-----------+--------------------+-------------------+-------------------+----------------+----------+
|user_000034|[How Soon Is Now ...|2005-09-15 22:01:31|2005-09-15 22:27:51|            1580|         0|
|user_000034|[Memory War, Offi...|2005-09-16 14:30:04|2005-09-16 20:47:41|           22657|         1|
|user_000034|[An Honest Mistak...|2005-09-19 20:55:36|2005-09-19 21:30:23|            2087|         2|
|user_000034|[Single Again, He...|2005-09-20 14:21:09|2005-09-20 16:07:42|            6393|         3|
|user_000034|[Single Again, He...|2005-09-20 16:49:55|2005-09-20 17:22:14|            1939|         4|
|user_000034|[Utilitarian, The...|2005-09-21 14:40:01|2005-09-21 15:24:10|            2649|         5|
|user_000034|[Single Again, He...|2005-09-21 16:02:10|2005-09-21 17:44:57

In [9]:
#Due to data size and java heap errors, sort performed only on session_id column
top_sessions = sessions_with_duration.select('session_id')
top_sessions = top_sessions.orderBy(F.col('session_duration').desc())
top_sessions = top_sessions.limit(10).drop('session_duration')
top_sessions.show()

+------------+
|  session_id|
+------------+
| 25769825008|
| 60129569392|
| 25769825315|
|103079222335|
| 25769824996|
| 25769824982|
| 25769825029|
|103079222315|
|197568507219|
| 25769825009|
+------------+



In [10]:
#If had more time, would like to have filtered per partition by the session_id, then rejoin the partitions for the final dataframe.