# Work with big data

Data Science - CA#03: *Big data cleaning* - Spring 1403 \
The provided dataset(spotify.parquet) contains information about songs streamed on Spotify, which is an audio streaming and media service provider. You have access to a song’s album, artist, its musical characteristics and its release date.

In [1]:
from IPython.display import display, HTML
display(HTML('<style>pre { white-space: pre !important; }</style>'))

In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m649.2 kB/s[0m eta [36m0:00:00[0m00:01[0m00:13[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m437.1 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=1473cf1f8549ded8da742ff39e7a268178a1fd9718a05f0ecc46d76f59b607ea
  Stored in directory: /home/fzbroumandnia/.cache/pip/wheels/95/13/41/f7f135ee114175605fb4f0a89e7389f3742aa6c1e1a5bcb657
Successfully built pyspark
Installing 

## <center> Warm-up! <center>

### helper function for showing info:

In [None]:
from pyspark.sql import DataFrame

def show_df_info(df: DataFrame, num_rows: int = 5):
    row_count = df.count()
    print(f"Number of rows: {row_count}")
    df.show(num_rows)


### Step1: read the csv file

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, min, max, mean, stddev

spark = SparkSession.builder \
    .appName("StockAnalysis") \
    .getOrCreate()

df = spark.read.csv("stocks.csv", header=True, inferSchema=True)


### Step2: Schema of data

In [7]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### Step 3: Showing opening, closing and volume of Records with closing price less than 500

In [12]:
df_less_than_500 = df.filter(df["Close"] < 500).select("Open", "Close", "Volume")
show_df_info(df_less_than_500)

Number of rows: 1359
+----------+------------------+---------+
|      Open|             Close|   Volume|
+----------+------------------+---------+
|213.429998|        214.009998|123432400|
|214.599998|        214.379993|150476200|
|214.379993|        210.969995|138040000|
|    211.75|            210.58|119282800|
|210.299994|211.98000499999998|111902700|
+----------+------------------+---------+
only showing top 5 rows



### Step 4: Records with opening price more than 200 and closing price less than 200

In [13]:
df_open_more_200_close_less_200 = df.filter((df["Open"] > 200) & (df["Close"] < 200))
show_df_info(df_open_more_200_close_less_200)

Number of rows: 3
+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



### Step 5: Extract year from date and save it in a new column

In [16]:
df_with_year = df.withColumn("Year", year("Date"))
show_df_info(df_with_year)

Number of rows: 1762
+----------+----------+----------+------------------+------------------+---------+------------------+----+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
only showing top

### Step 6: Minimum volumes traded for each year

In [19]:
min_volume_per_year = df_with_year.groupBy("Year").agg(min("Volume").alias("minVolume"))
show_df_info(min_volume_per_year, 7)

Number of rows: 7
+----+---------+
|Year|minVolume|
+----+---------+
|2015| 13046400|
|2013| 41888700|
|2014| 14479600|
|2012| 43938300|
|2016| 11475900|
|2010| 39373600|
|2011| 44915500|
+----+---------+



### Step 7: Highest low price for each year and month

In [21]:
max_low_price_per_year_month = df_with_year.groupBy("Year", month("Date").alias("Month")) \
    .agg(max("Low").alias("maxLow"))
show_df_info(max_low_price_per_year_month)

Number of rows: 84
+----+-----+----------+
|Year|Month|    maxLow|
+----+-----+----------+
|2012|   10|665.550026|
|2010|    7|260.300003|
|2010|   12|325.099991|
|2015|    2|131.169998|
|2014|    4|589.799988|
+----+-----+----------+
only showing top 5 rows



### Step 8: Mean and standard deviation of high price

In [22]:
mean_high_price = df.agg(mean("High").alias("MeanHighPrice")).collect()[0]["MeanHighPrice"]
stddev_high_price = df.agg(stddev("High").alias("StddevHighPrice")).collect()[0]["StddevHighPrice"]
print(f"Mean High Price: {mean_high_price:.2f}")
print(f"Standard Deviation of High Price: {stddev_high_price:.2f}")

# Stop SparkSession
spark.stop()


Mean High Price: 315.91
Standard Deviation of High Price: 186.90


## <center>Main Task</center>

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max, mean, stddev

spark = SparkSession.builder \
    .appName("SpotifyAnalysis") \
    .getOrCreate()
df = spark.read.load("spotify.parquet")

In [27]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- album: string (nullable = true)
 |-- album_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- artist_ids: string (nullable = true)
 |-- track_number: long (nullable = true)
 |-- disc_number: long (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: long (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- year: long (nullable = true)
 |-- release_date: string (nullable = true)



In [7]:
show_df_info(df,10)

Number of rows: 1204025
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-----------+--------+------------------+------+---+-------------------+----+-----------+-----------------+--------------------+--------+------------------+------------------+-----------+--------------+----+------------+
|                  id|                name|               album|            album_id|             artists|          artist_ids|track_number|disc_number|explicit|      danceability|energy|key|           loudness|mode|speechiness|     acousticness|    instrumentalness|liveness|           valence|             tempo|duration_ms|time_signature|year|release_date|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-----------+--------+------------------+------+---+-------------------+----+-----------+-----------------+----------

## <center>Questions</center>

### 1. how Spark and Hadoop work. What does the term ‘lazy evaluation’ mean for them? Explain with a simple example.

#### Hadoop and Spark Comparison

##### Hadoop
- Utilizes HDFS for distributed storage and MapReduce for distributed processing.
- Well-suited for batch processing of large datasets but can be less efficient for iterative or interactive workloads.

##### Spark
- Employs in-memory computing and offers various APIs including RDDs, Spark SQL, Spark Streaming, MLlib, and GraphX.
- Faster and more versatile than Hadoop, suitable for a wide range of use cases including batch processing, real-time processing, iterative algorithms, and interactive data analysis.

#### Lazy Evaluation in Spark and Hadoop

In the context of Spark and Hadoop, lazy evaluation means that transformations on data are not immediately executed. Instead, they are stored as a series of transformations to be performed later, when an action is called. This allows for optimization of the computation process.

For example, in Spark, if you have a dataset and you apply multiple transformations like filtering, mapping, and aggregating, Spark won't execute these transformations immediately. It will wait until an action like collect() or show() is called. This deferred execution helps Spark optimize the computation by combining multiple transformations and executing them in an efficient manner.

### 2. Your main task’s dataset has about 1,200,000 rows, which makes it quite hard, and even sometimes impossible, to work with. Explain how parquet files try to solve this problem, compared to normal file formats like csv.

#### Advantages of Parquet Files

##### Columnar Storage
- Parquet files store data in a columnar format, storing each column separately.
- Enables efficient compression and encoding techniques on a per-column basis, reducing storage space compared to row-based formats like CSV.

##### Predicate Pushdown
- Parquet files support predicate pushdown, reading only necessary columns and rows based on query predicates.
- Reduces I/O overhead and improves query performance, especially for selective queries.

##### Schema Evolution
- Parquet files include metadata describing the data schema, allowing for schema evolution without compatibility issues.
- New columns can be added or existing columns modified without rewriting the entire dataset.

##### Compression
- Parquet files support various compression algorithms such as Snappy, GZIP, and LZ4.
- Compression techniques reduce storage footprint and improve read/write performance.

##### Partitioning
- Parquet files can be partitioned based on one or more columns.
- Improves query performance by enabling partition pruning, scanning only relevant partitions during query execution.

Overall, Parquet files offer significant advantages over traditional file formats like CSV when working with large datasets, making them a preferred choice for big data processing frameworks.

### 3. Sparkdoesn’tsavecheckpoints.How can we enforce it to do so?

#### Enable Checkpointing

Before using checkpointing in Spark, you need to enable it by setting the checkpoint directory using the SparkContext.setCheckpointDir() method. Ensure that this directory is accessible by all nodes in the Spark cluster.

In [None]:
sc.setCheckpointDir("hdfs://path/to/checkpoint/dir")

#### Perform Checkpointing

Once checkpointing is enabled in Spark, you can enforce it on an RDD or DataFrame by calling the checkpoint() method. This action triggers the execution of the computation graph up to that point and saves the intermediate results to the checkpoint directory.

In [None]:
rdd.checkpoint()
df.checkpoint()

### 4. Top companies stream their data on a regular routine, e.g. daily. How can we save data, so that we could filter it based on specific columns, e.g. date, faster than regular filtering?

#### Partitioning by Date

If you're streaming data on a daily basis, you can partition the data by date. Each day's data can be saved in a separate directory or file, with the directory or file name representing the date. This allows for fast filtering based on the date column, as Spark or any other processing framework can directly access the relevant partition containing the data for the desired date.

#### File Formats

Choose a file format that supports partitioning efficiently, such as Parquet. Parquet files can be partitioned based on one or more columns, enabling efficient pruning of irrelevant partitions during query execution.

#### Optimized Storage

Ensure that the data is stored in a distributed file system like HDFS or an object store like Amazon S3. This ensures that the data is distributed across multiple nodes, allowing for parallel processing and faster retrieval.

#### Optimize for Query Patterns

Analyze the typical query patterns and partition the data accordingly. For example, if filtering by date is the most common operation, partitioning by date would be beneficial. Similarly, if filtering by another column is more common, consider partitioning by that column instead.

### 5.  Let's face off Pandas and PySpark in the data analysis arena! When does each library truly shine, and why?

#### Pandas vs. PySpark: A Comparative Analysis

##### Data Size

- Pandas: Best suited for working with small to medium-sized datasets that can fit into memory. It performs well when the dataset fits comfortably on a single machine.
- PySpark: Designed for handling large-scale datasets that exceed the memory capacity of a single machine. PySpark distributes the data across a cluster of machines, allowing for processing of massive datasets that cannot be handled by Pandas alone.

##### Processing Complexity

- Pandas: Ideal for complex data manipulation and analysis tasks due to its rich set of functions and expressive syntax. It offers a wide range of operations for data cleaning, transformation, and analysis, making it suitable for intricate data processing workflows.
- PySpark: Well-suited for processing complex data transformations and analytics at scale. PySpark leverages distributed computing to handle complex operations on large datasets efficiently. It excels in scenarios where processing complexity is high and parallel processing is necessary for timely analysis.

##### User Experience

- Pandas: Offers a user-friendly and intuitive interface, making it easy for data analysts and scientists to perform exploratory data analysis and build models. Pandas' syntax is concise and familiar to Python users, which contributes to its popularity and ease of use.
- PySpark: Requires a deeper understanding of distributed computing concepts and Spark's API. While PySpark provides similar functionality to Pandas, transitioning from Pandas to PySpark may require some learning curve, especially for users unfamiliar with distributed computing paradigms. However, once users are comfortable with Spark's API, they can leverage its scalability and performance benefits.