<a href="https://colab.research.google.com/github/ChinYuenAu/BigDataAnalytics/blob/main/CS7280proj3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download dataset from kaggle

## feel free to skip this step
### By obtaining csv file from https://storage.googleapis.com/cs7280-proj3/redfin.csv instead. This saves time from downloading the data again.

In [None]:
!pip install kaggle



In [None]:
from google.colab import drive
drive.mount('/content/drive')

KeyboardInterrupt: 

In [None]:
!mkdir ~/.kaggle

In [None]:
!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/

In [None]:
# Step 4: Set appropriate permissions for the Kaggle API token
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Step 5: Download the dataset
!kaggle datasets download -d vincentvaseghi/us-cities-housing-market-data

In [None]:
# Step 6: Unzip the dataset
!unzip us-cities-housing-market-data.zip

## Convert tsv file to csv file format

In [None]:
# Step 7: Import necessary libraries
import pandas as pd

In [None]:
csv_table = pd.read_table('city_market_tracker.tsv000', sep='\t')

In [None]:
csv_table.to_csv('redfin.csv', index=False)

In [None]:
import os
print(os.getcwd())

In [None]:
!mv redfin.csv /content/drive/MyDrive

# Install Pyspark and appropriate files

In [None]:
#@title Install all the appropriate packages

!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=9c16d176ea5dbc77b88c646e48f4e5653f65dae0c77e9f4f80b9bb819c77eb93
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [None]:
#@title Import and create Spark Context

# Let's import the libraries we will need
import itertools

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
from pyspark import SparkFiles

In [None]:
from pyspark.sql.functions import year, col

In [None]:
# URL of the CSV file
url = "https://storage.googleapis.com/cs7280-proj3/redfin.csv"

In [None]:
sc.addFile(url)

In [None]:
path = SparkFiles.get("redfin.csv")

In [None]:
df = spark.read.csv("file://"+ path, header=True, inferSchema=True)

In [None]:
df.show()

+------------+----------+---------------+-----------+--------------+--------+----------------------+--------------------+------------------+-------------+----------+--------------------+----------------+-----------------+---------------------+---------------------+-----------------+---------------------+---------------------+------------------+-------------------+-------------------+------------------+--------------------+--------------------+----------+-------------------+-------------------+-------------+-------------------+-------------------+------------+-------------------+-------------------+---------+-------------------+-------------------+----------------+--------------------+--------------------+----------+--------------+--------------+------------------+--------------------+--------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+-----------------------+---------------------------+--------------

In [None]:
distinct_states_df = df.select(col("state"), col("state_code")).distinct()

In [None]:
distinct_states_df.show(distinct_states_df.count(), truncate=False)

In [None]:
sold_inventory = df.select(year("period_begin").alias("year"), "homes_sold", "inventory", "state")

In [None]:
sold_inventory = sold_inventory.withColumn("difference", col("inventory") - col("homes_sold"))

In [None]:
sold_inventory = sold_inventory.na.drop(subset=["difference"])

In [None]:
sold_inventory.show()

In [None]:
sold_inventory = sold_inventory.rdd

In [None]:
sold_inventory.take(5)

In [None]:
# sort by year and sort by differences
sorted_sold_inventory = sold_inventory.sortBy(lambda x: (x[0], -x[4]))

In [None]:
sorted_sold_inventory.take(5)

In [None]:
grouped_by_year_state = sorted_sold_inventory.map(lambda x: ((x[0], x[3]), (x[4], 1))).reduceByKey(lambda x, y: (x[0]+y[0], x[1] + y[1])).mapValues(lambda x: (x[0] / x[1])).map(lambda x:( x[0][0], x[0][1],x[1]))

In [None]:
grouped_by_year_state.take(10)

In [None]:
# Take the top 5 differences for each year
top_5_by_year = grouped_by_year_state.groupBy(lambda x: x[0]).flatMapValues(lambda x: sorted(x, key=lambda y: -y[2])[:5]).map(lambda x: (x[0], x[1][1], x[1][2]))

In [None]:
top_5_by_year.collect()

In [None]:
# Take the bottom 5 differences for each year
bottom_5_by_year = grouped_by_year_state.groupBy(lambda x: x[0]).flatMapValues(lambda x: sorted(x, key=lambda y: -y[2])[-5:-1]).map(lambda x: (x[0], x[1][1], x[1][2]))

In [None]:
# import matplotlib.pyplot as plt

# # Collect the data from the RDD
# data = top_5_by_year.collect()

# # Prepare data for plotting
# years = [row[0] for row in data]
# states = [row[1] for row in data]
# differences = [row[2] for row in data]

# # Plotting
# plt.figure(figsize=(10, 6))

# # Create a color map for states
# cmap = plt.get_cmap('tab10')

# # Iterate over each unique state and plot its differences for each year
# for i, state in enumerate(sorted(set(states))):
#     state_data = [data[j] for j in range(len(data)) if data[j][1] == state]
#     years = [row[0] for row in state_data]
#     differences = [row[2] for row in state_data]
#     plt.scatter(years, differences, label=state, color=cmap(i))

# plt.title('Top 5 Differences for Each Year (Color-coded by State)')
# plt.xlabel('Year')
# plt.ylabel('Difference')
# plt.xticks(rotation=45)
# plt.legend(title='State', loc='upper left')
# plt.tight_layout()
# plt.show()

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Define your data
data = top_5_by_year.collect()

# Create DataFrame
df = pd.DataFrame(data, columns=['Year', 'State', 'Value'])

# Pivot the data to have states as columns and years as rows
pivot_df = df.pivot(index='Year', columns='State', values='Value')

# Rank the values in descending order (1 = highest value)
ranked_df = pivot_df.rank(axis=1, ascending=False, method='first')

# Plotting
fig, ax = plt.subplots(figsize=(15, 10))
colors = plt.get_cmap('tab10')

for column in ranked_df.columns:
    line, = ax.plot(ranked_df.index, ranked_df[column], label=column, marker='o', color=colors(ranked_df.columns.tolist().index(column) / len(ranked_df.columns)))
    for idx in ranked_df.index:
        ax.annotate(f'{pivot_df.at[idx, column]:.0f}',
                    xy=(idx, ranked_df.at[idx, column]),
                    textcoords="offset points",
                    xytext=(0, 5),
                    ha='center',
                    fontsize=10,
                    color=line.get_color())

ax.set_xlabel('Year')
ax.set_ylabel('Rank')
ax.set_title('Top five states with biggest inventory surplus from 2012 to 2024', pad=10, fontsize=22)
ax.invert_yaxis()  # Invert y axis so that rank 1 is at the top
ax.legend(title='State')

# Set y-axis to show only integer ticks
ax.yaxis.set_major_locator(plt.MaxNLocator(integer=True))

plt.show()

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Define your data
data = bottom_5_by_year.collect()

# Create DataFrame
df = pd.DataFrame(data, columns=['Year', 'State', 'Value'])

# Pivot the data to have states as columns and years as rows
pivot_df = df.pivot(index='Year', columns='State', values='Value')

# Rank the values in descending order (1 = highest value)
ranked_df = pivot_df.rank(axis=1, ascending=False, method='first')

# Plotting
fig, ax = plt.subplots(figsize=(15, 10))
colors = plt.get_cmap('tab10')

for column in ranked_df.columns:
    line, = ax.plot(ranked_df.index, ranked_df[column], label=column, marker='o', color=colors(ranked_df.columns.tolist().index(column) / len(ranked_df.columns)))
    for idx in ranked_df.index:
        ax.annotate(f'{pivot_df.at[idx, column]:.0f}',
                    xy=(idx, ranked_df.at[idx, column]),
                    textcoords="offset points",
                    xytext=(0, 5),
                    ha='center',
                    fontsize=10,
                    color=line.get_color())

ax.set_xlabel('Year')
ax.set_ylabel('Rank')
ax.set_title('Top five states with biggest inventory surplus from 2012 to 2024', pad=10, fontsize=22)
ax.invert_yaxis()  # Invert y axis so that rank 1 is at the top
ax.legend(title='State', loc='upper left')

# Set y-axis to show only integer ticks
ax.yaxis.set_major_locator(plt.MaxNLocator(integer=True))

plt.show()

In [None]:
spark.stop()