## Introduction to Data Science | 4022 | Dr. Bahrak & Dr. Yaghoobzadeh
## CA3
***
### Amirreza Akbari | 810899045
### Reza Baghestani | 810899046
### Hananeh Jamali | 810899053
***
### 1402/01/29

## Install & Import Libraries

In [45]:
import importlib.util

# Check if PySpark is installed
if importlib.util.find_spec("pyspark") is None:
    # Install PySpark
    !pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, max
from pyspark.sql.functions import mean, stddev

For better visualization of dataframes in jupyter notebooks, since they are rendered to html

In [46]:
from IPython.display import display, HTML
display(HTML('<style>pre { white-space: pre !important; }</style>'))

## Warm-Up

In [47]:
print("\n--------------------------------------------------------     STEP 1     --------------------------------------------------------")
# Step 1: Read the CSV file

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Stocks Analysis") \
    .getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read.csv("stocks.csv", header=True, inferSchema=True)

print("CSV file reading done!")

print("\n--------------------------------------------------------     STEP 2     --------------------------------------------------------")
# Step 2: Find out about the schema of data

print("The schema of data:\n")
df.printSchema()
print("\nnumber of dataframe rows:", df.count())

print("\n--------------------------------------------------------     STEP 3     --------------------------------------------------------")
# Step 3: Select records with closing price less than 500

less_than_500_df = df.filter(df['Close'] < 500).select('Open', 'Close', 'Volume')
print("Records with closing price less than 500:\n")
less_than_500_df.show()
print("number of records:", less_than_500_df.count())

print("\n--------------------------------------------------------     STEP 4     --------------------------------------------------------")
# Step 4: Find out records with opening price more than 200 and closing price less than 200

more_than_200_less_than_200_df = df.filter((df['Open'] > 200) & (df['Close'] < 200))
print("Records with opening price more than 200 and closing price less than 200:\n")
more_than_200_less_than_200_df.show()
print("number of records:", more_than_200_less_than_200_df.count())

print("\n--------------------------------------------------------     STEP 5     --------------------------------------------------------")
# Step 5: Extract the year from the date and save it in a new column

df_with_year = df.withColumn('Year', year(df['Date']))
print("The dataframe with the new row \"Year\":\n")
df_with_year.show()

print("\n--------------------------------------------------------     STEP 6     --------------------------------------------------------")
# Step 6: For each year, show the minimum volumes traded

min_volume_by_year = df_with_year.groupBy('Year').min('Volume').withColumnRenamed('min(Volume)', 'minVolume')
print("Minimum volumes traded for each year:\n")
min_volume_by_year.show()

print("\n--------------------------------------------------------     STEP 7     --------------------------------------------------------")
# Step 7: For each year and month, show the highest low price

df_with_year_month = df.withColumn('Year', year(df['Date'])).withColumn('Month', month(df['Date']))
max_low_price_by_year_month = df_with_year_month.groupBy('Year', 'Month').agg(max('Low').alias('maxLow'))
print("Highest \"Low Price\" for each year and month:\n")
max_low_price_by_year_month.show()
print("\nnumber of rows:", max_low_price_by_year_month.count())

print("\n--------------------------------------------------------     STEP 8     --------------------------------------------------------")
# Step 8: Calculate mean and standard deviation of high price over the whole data frame

mean_high_price = df.select(mean('High')).collect()[0][0]
stddev_high_price = df.select(stddev('High')).collect()[0][0]
print("Mean High Price:", round(mean_high_price, 2))
print("Standard Deviation of High Price:", round(stddev_high_price, 2))

# Stop the SparkSession
spark.stop()


--------------------------------------------------------     STEP 1     --------------------------------------------------------
CSV file reading done!

--------------------------------------------------------     STEP 2     --------------------------------------------------------
The schema of data:

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

number of dataframe rows: 1762

--------------------------------------------------------     STEP 3     --------------------------------------------------------
Records with closing price less than 500:

+------------------+------------------+---------+
|              Open|             Close|   Volume|
+------------------+------------------+---------+
|        213.429998|        214.009998|123432400|
|        214.599998|        

In [9]:
# Step 1: Check the Schema
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Spotify Analysis") \
    .getOrCreate()

# Read the parquet file into a DataFrame
df = spark.read.parquet("spotify.parquet")

# Print the schema
df.printSchema()

# Preprocess Columns
from pyspark.sql.functions import to_date

# Convert 'release_date' to date type
df = df.withColumn('release_date', to_date(df['release_date']))

# Aggregation, Filtering, and Transformation
# For example, let's calculate the average danceability and energy of songs per year
average_characteristics = df.groupBy('year').agg({'danceability': 'avg', 'energy': 'avg'})

# Dealing with Array Columns
# For example, if we want to explode the 'artists' array to get each artist in a separate row
from pyspark.sql.functions import split, explode

# Split the 'artists' string by comma and create an array
df_with_array = df.withColumn('artists_array', split(df['artists'], ', '))

# Explode the array to get each artist in a separate row
df_exploded = df_with_array.withColumn('artist', explode(df_with_array['artists_array']))

# Drop the intermediate 'artists_array' column
df_exploded = df_exploded.drop('artists_array')

# Show the resulting DataFrame
df_exploded.show()

#This is Wrong
# Top-K Records
# For example, let's find the top 10 most streamed songs
top_songs = df.orderBy(df['duration_ms'].desc()).limit(10)

# Show the results
# Show the resulting DataFrame after exploding the artists array
print("Exploded DataFrame with Each Artist in a Separate Row:")
df_exploded.show()

# Add a separator line
print("=" * 80)

# Show the average characteristics of songs per year
print("Average Characteristics of Songs Per Year:")
average_characteristics.show()

# Add a separator line
print("=" * 80)
#This is wrong
# Show the top 10 most streamed songs
print("Top 10 Most Streamed Songs:")
top_songs.show()



# Stop the SparkSession
spark.stop()


root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- album: string (nullable = true)
 |-- album_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- artist_ids: string (nullable = true)
 |-- track_number: long (nullable = true)
 |-- disc_number: long (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: long (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: long (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: long (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- year: long (nullable = true)
 |-- release_date: string (nullable = true)
+--------------------+----------------