# Comprehensive Data Analysis with PySpark

# Introduction

In this notebook, we will be exploring the powerful capabilities of PySpark. PySpark is the Python library for Apache Spark, an open-source, distributed computing system used for big data processing and analytics.

We will be using Google Colab for our work. While not strictly necessary for PySpark, Google Colab provides several advantages. It offers a convenient, cloud-based environment for using PySpark, providing free access to computational resources like CPUs and GPUs

Our journey will take us through the process of installing PySpark, loading and cleaning data, performing some exploratory data analysis, and finally, drawing some conclusions from our data. We hope this notebook serves as a useful guide for your own data processing and analysis tasks with PySpark.

# Table of Contents
1. [Introduction](#Introduction)
2. [Exploratory Data Analysis (EDA) on the First Dataset](#eda-first-dataset)
3. [Query the data](#eda-first-dataset)
4. [EDA on another dataset, query & visulization](#eda-second-dataset)
5. [Join the two datasets, query & visualization](#eda-joined-dataset)


### Let's mount the google drive

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

Installing pyspark module



In [None]:
# !pip install pyspark

Importing the modules

In [None]:

from pyspark.sql import SparkSession 
from pyspark.sql.functions import count, desc , col, max, when, struct
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

Creating spark session

In [None]:
spark = SparkSession.builder.appName('spark_app').getOrCreate()

# Task 1 : EDA on the First Dataset <a id='eda-first-dataset'></a>
importing the *Listenings.csv* file:

In [None]:
listening_df = spark.read.csv('/content/drive/MyDrive/dataset/listenings.csv', inferSchema=True, header=True) # direct read approach

# Format & options appraoch
# listening_df = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/content/drive/MyDrive/dataset/listenings.csv')

let's check the data:

In [None]:
listening_df.show(10)

let's see the shape of our dataframe before cleaning

In [None]:
shape = (listening_df.count(), len(listening_df.columns))
print(shape)

In [None]:
# check the schema
listening_df.printSchema()

In [None]:
# check if there are null values
listening_df.select([count(when(col(c).isNull(), c)).alias(c) for c in listening_df.columns]).show()

let's delete useless columns:

In [None]:
listening_df = listening_df.drop('date')

drop the null values:

In [None]:
listening_df = listening_df.na.drop()

let's check the dataset again:

In [None]:
listening_df.show()

let's see the shape of our dataframe after cleaning

In [None]:
shape = (listening_df.count(), len(listening_df.columns))
print(shape)

# Task 2: Query the data <a id='query-data'></a>

**Query #0:**
select two columns: track and artist

In [None]:
q0 = listening_df.select('artist', 'track')
q0.show(truncate=False)

**Query #1**:

Let's find all of the records of those users who have listened to ***Rihanna***

In [None]:
q1 = listening_df.filter(col('artist') == 'Rihanna')
q1.show(truncate=False)

**Query #2:**

Let's find top 10 users who are fan of ***Rihanna***

In [None]:
q2 = listening_df.select('user_id').filter(col('artist') == 'Rihanna') \
     .groupby('user_id').agg(count('user_id').alias('count')) \
     .orderBy(desc('count')).limit(10)

q2.show()

**Query #3:**

find top 10 famous tracks

In [None]:
q3 = (listening_df
      .groupby('artist', 'track')
      .agg(count('*').alias('count'))
      .orderBy(desc('count'))
      .limit(10))

q3.show()

**Query #4:**

find top 10 famous tracks of ***Rihanna***

In [None]:
q4 = (listening_df
      .filter(col('artist') == 'Rihanna')
      .groupby('artist', 'track')
      .agg(count('*').alias('count'))
      .orderBy(desc('count'))
      .limit(10))

q4.show()

**Query #5:**

find top 10 famous albums

In [None]:
q5 = (listening_df
      .groupby('artist', 'album')
      .agg(count('*').alias('count'))
      .orderBy(col('count'))
      .limit(10))

q5.show()

# Task 3 : EDA on another dataset, query & visulization <a id='eda-second-dataset'></a>
importing the ***genre.csv*** file:

In [None]:
genre_df = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/content/drive/MyDrive/dataset/genre.csv')

let's check the data

In [None]:
genre_df.show()

check the top 10 genres

In [None]:
top_genres = (genre_df.groupBy('genre')
              .agg(count('genre').alias('count'))
              .orderBy('count', ascending=False)
              .limit(10))
top_genres.show()

Visualize the top 10 genres

In [None]:
# Convert to Pandas DataFrame
top_genres_pd = top_genres.toPandas()

# Plotting
plt.figure(figsize=(10, 6))
sns.barplot(x='count', y='genre', data=top_genres_pd, palette='viridis')
plt.title('Top 10 Genres')
plt.xlabel('Count')
plt.ylabel('Genre')
plt.show()


In [None]:
listening_df.show()

Let's inner join these two data frames

In [None]:
# Remove all duplicate rows in the genre dataset
df_cleaned = genre_df.dropDuplicates()

# Join the two datasets
data = listening_df.join(df_cleaned, ['artist'], 'inner')
data.show()

# After joining the two datasets, we can see that there are some abnormal values in the genre column.
# After checkinh the orighinal dataset, we can find that the genre column contains some values that are not genres. 
# This is because the source dataset is not clean, where the name of artists are also included in the genre column.
# Considering that the number of these abnormal values is large, futher cleaning is needed.

**Query #6**

find top 10 users who are fan of ***pop*** music

In [None]:
q6 = (data.filter(col('genre') == 'pop')
      .groupBy('user_id')
      .agg(count('*').alias('count'))
      .orderBy(desc('count'))
      .limit(10))

q6.show()

**Query #7**

find top 10 famous genres

In [None]:
q7 = (data.groupBy('genre')
      .agg(count('*').alias('count'))
      .orderBy(desc('count'))
      .limit(10))

q7.show()

# Task 4: Join the two datasets, query & visualization <a id='eda-joined-dataset'></a>
**Query #8**

find out each user favourite genre

In [None]:
# To find out each user's favorite genre, we need to count the number of times each user listens to each genre first.
# Counting Genres per User
q8_1 = (data.groupBy('user_id', 'genre')
        .agg(count('*').alias('count'))
        .orderBy('user_id'))

q8_1.show()

In [None]:
#Finding Most Frequent Genre per User
q8_2 = (q8_1.groupBy('user_id')
        .agg(max(struct(col('count'), col('genre'))).alias('max'))
        .select(col('user_id'), col('max.genre')))

q8_2.show()

**Query #9**

find out how many pop,rock,metal and hip hop singers we have and then visulize it using bar chart

In [None]:
q9 = (genre_df.select('genre')
      .filter(col('genre').isin('pop', 'rock', 'metal', 'hip hop'))
      .groupby('genre')
      .agg(count('genre').alias('count')))
q9.show()

Now, let's visualize the results using ***matplotlib***

In [None]:
q9_list = q9.collect()

In [None]:
labels = [row['genre'] for row in q9_list]
counts = [row['count'] for row in q9_list]

In [None]:
print(labels)
print(counts)

now lets visualize these two lists using a bar chart

In [None]:
plt.figure(figsize=(10, 6))
plt.bar(labels, counts, color='skyblue')
plt.xlabel('Genre')
plt.ylabel('Count')
plt.title('Counts of Selected Genres')
plt.xticks(rotation=45)
plt.show()