<a href="https://colab.research.google.com/github/chouhandiksha/bigdataproject/blob/colab/notebooks/Analyse_poverty_mobility_2019_NY.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Spark SQL Documentation:** 
https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

In [19]:
!pip install ipython-autotime

%load_ext autotime

The autotime extension is already loaded. To reload it, use:
  %reload_ext autotime
time: 2.73 s (started: 2021-04-06 20:16:53 +00:00)


In [20]:
# Install required dependancies
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u282-b08-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 30 not upgraded.
time: 7.28 s (started: 2021-04-06 20:17:01 +00:00)


In [None]:
# Import modules
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

time: 121 ms (started: 2021-04-06 19:38:40 +00:00)


In [21]:
import altair as alt
alt.data_transformers.disable_max_rows()

DataTransformerRegistry.enable('default')

time: 9.61 ms (started: 2021-04-06 20:17:29 +00:00)


In [23]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

time: 688 ms (started: 2021-04-06 20:17:51 +00:00)


In [24]:
# Mount drive with data
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
time: 2.76 ms (started: 2021-04-06 20:18:01 +00:00)


In [None]:
# Demographic Data NY
# !ls drive/MyDrive/big-data-project/data/clean-data/ny/ny.csv

# !ls drive/MyDrive/big-data-project/data/clean-data/ny/social/2019/


time: 1.38 ms (started: 2021-04-06 19:39:38 +00:00)


In [25]:
# Set path to data folder
path = Path('drive/MyDrive/big-data-project/data/clean-data')
city = 'ny'

time: 2.85 ms (started: 2021-04-06 20:18:10 +00:00)


In [26]:
# Read data into RDD
rdd_soc = spark.read.format('csv').option('header','true').option('quote',"\"").option('escape',"\"").load(str(path/city/'social/2019/*.csv'))
rdd_soc.show()

+---+---+----------------+--------------+------------+---------------------------+--------------------------+------------------------------------------+----------------------------+----------------------+------------------------+--------------------+-------------------------------+-------------------------------+----------------+-------------------------+--------------------------+----------------------+----------------------------+---------------------------+-----------------------------+--------------------+------------------------+--------------------------------+
|_c0|cbg|date_range_start|date_range_end|device_count|distance_traveled_from_home|bucketed_distance_traveled|median_dwell_at_bucketed_distance_traveled|completely_home_device_count|median_home_dwell_time|bucketed_home_dwell_time|at_home_by_each_hour|part_time_work_behavior_devices|full_time_work_behavior_devices|destination_cbgs|delivery_behavior_devices|median_non_home_dwell_time|candidate_device_count|bucketed_away_from_

In [27]:
# View schema
rdd_soc.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cbg: string (nullable = true)
 |-- date_range_start: string (nullable = true)
 |-- date_range_end: string (nullable = true)
 |-- device_count: string (nullable = true)
 |-- distance_traveled_from_home: string (nullable = true)
 |-- bucketed_distance_traveled: string (nullable = true)
 |-- median_dwell_at_bucketed_distance_traveled: string (nullable = true)
 |-- completely_home_device_count: string (nullable = true)
 |-- median_home_dwell_time: string (nullable = true)
 |-- bucketed_home_dwell_time: string (nullable = true)
 |-- at_home_by_each_hour: string (nullable = true)
 |-- part_time_work_behavior_devices: string (nullable = true)
 |-- full_time_work_behavior_devices: string (nullable = true)
 |-- destination_cbgs: string (nullable = true)
 |-- delivery_behavior_devices: string (nullable = true)
 |-- median_non_home_dwell_time: string (nullable = true)
 |-- candidate_device_count: string (nullable = true)
 |-- bucketed_away_from_home_ti

In [None]:
# # Take small sample of data to experiment with
# rdd_soc = rdd_soc.limit(100)
# rdd_soc.show()

time: 1.33 ms (started: 2021-04-06 19:41:23 +00:00)


In [28]:
rdd_soc.createOrReplaceTempView('clean_ny')
rdd_soc = spark.sql('SELECT cbg, date_range_start, device_count, completely_home_device_count, part_time_work_behavior_devices, full_time_work_behavior_devices FROM clean_ny')
rdd_soc.show()

+---+----------------+------------+----------------------------+-------------------------------+-------------------------------+
|cbg|date_range_start|device_count|completely_home_device_count|part_time_work_behavior_devices|full_time_work_behavior_devices|
+---+----------------+------------+----------------------------+-------------------------------+-------------------------------+
+---+----------------+------------+----------------------------+-------------------------------+-------------------------------+

time: 2.89 s (started: 2021-04-06 20:21:41 +00:00)


In [29]:
# View schema
rdd_soc.printSchema()

root
 |-- cbg: string (nullable = true)
 |-- date_range_start: string (nullable = true)
 |-- device_count: string (nullable = true)
 |-- completely_home_device_count: string (nullable = true)
 |-- part_time_work_behavior_devices: string (nullable = true)
 |-- full_time_work_behavior_devices: string (nullable = true)

time: 1.76 ms (started: 2021-04-06 20:21:50 +00:00)


In [30]:
# Add completely home percentage column
rdd_soc = rdd_soc.withColumn('completely_home_percentage', (rdd_soc['completely_home_device_count']/rdd_soc['device_count']) * 100)
rdd_soc = rdd_soc.withColumn('part_time_work_percentage', (rdd_soc['part_time_work_behavior_devices']/rdd_soc['device_count']) * 100)
rdd_soc = rdd_soc.withColumn('full_time_work_percentage', (rdd_soc['full_time_work_behavior_devices']/rdd_soc['device_count']) * 100)

rdd_soc.show()

+---+----------------+------------+----------------------------+-------------------------------+-------------------------------+--------------------------+-------------------------+-------------------------+
|cbg|date_range_start|device_count|completely_home_device_count|part_time_work_behavior_devices|full_time_work_behavior_devices|completely_home_percentage|part_time_work_percentage|full_time_work_percentage|
+---+----------------+------------+----------------------------+-------------------------------+-------------------------------+--------------------------+-------------------------+-------------------------+
+---+----------------+------------+----------------------------+-------------------------------+-------------------------------+--------------------------+-------------------------+-------------------------+

time: 3.03 s (started: 2021-04-06 20:21:55 +00:00)


In [31]:
rdd_soc.printSchema()

root
 |-- cbg: string (nullable = true)
 |-- date_range_start: string (nullable = true)
 |-- device_count: string (nullable = true)
 |-- completely_home_device_count: string (nullable = true)
 |-- part_time_work_behavior_devices: string (nullable = true)
 |-- full_time_work_behavior_devices: string (nullable = true)
 |-- completely_home_percentage: double (nullable = true)
 |-- part_time_work_percentage: double (nullable = true)
 |-- full_time_work_percentage: double (nullable = true)

time: 9.59 ms (started: 2021-04-06 20:22:00 +00:00)


In [32]:
# create temp view
rdd_soc.createOrReplaceTempView('new_clean_ny')

time: 21.6 ms (started: 2021-04-06 20:22:03 +00:00)


In [33]:
# get mean percentage for each cbg
rdd_soc_ny = spark.sql('SELECT cbg, AVG(completely_home_percentage) AS mean_completely_home_percentage FROM new_clean_ny GROUP BY cbg')
rdd_soc_ny.createOrReplaceTempView('mean_completely_home')
rdd_soc_ny.show()

+---+-------------------------------+
|cbg|mean_completely_home_percentage|
+---+-------------------------------+
+---+-------------------------------+

time: 3.77 s (started: 2021-04-06 20:22:07 +00:00)


In [None]:
# get mean percentage for full time work for each cbg
rdd_soc_ft = spark.sql('SELECT cbg, AVG(full_time_work_percentage) AS mean_full_time_work_percentage FROM new_clean_ny GROUP BY cbg')
rdd_soc_ft.createOrReplaceTempView('mean_fulltime_work')
rdd_soc_ft.show()

+---+------------------------------+
|cbg|mean_full_time_work_percentage|
+---+------------------------------+
+---+------------------------------+

time: 4.75 s (started: 2021-04-06 19:48:19 +00:00)


In [34]:
# get mean percentage for full time work for each cbg
rdd_soc_pt = spark.sql('SELECT cbg, AVG(part_time_work_percentage) AS mean_part_time_work_percentage FROM new_clean_ny GROUP BY cbg')
rdd_soc_pt.createOrReplaceTempView('mean_parttime_work')
rdd_soc_pt.show()

+---+------------------------------+
|cbg|mean_part_time_work_percentage|
+---+------------------------------+
+---+------------------------------+

time: 3.63 s (started: 2021-04-06 20:26:05 +00:00)


In [None]:
rdd_mobility = spark.sql('SELECT mean_completely_home.cbg, mean_completely_home.mean_completely_home_percentage, mean_fulltime_work.mean_full_time_work_percentage , mean_parttime_work.mean_part_time_work_percentage  FROM mean_completely_home INNER JOIN mean_fulltime_work ON mean_completely_home.cbg = mean_fulltime_work.cbg INNER JOIN mean_parttime_work ON mean_completely_home.cbg = mean_parttime_work.cbg')

In [None]:
#rdd_mobility.show()
rdd_mobility.createOrReplaceTempView('mobility')

In [None]:
# Read poverty data
# Read data into RDD
rdd_demographic = spark.read.format('csv').option('header','true').option('quote',"\"").option('escape',"\"").load(str(path/city/'ny.csv'))
rdd_demographic.createOrReplaceTempView('demographic')
rdd_demographic.show()

In [None]:
# join mobility and poverty
result = spark.sql('SELECT demographic.*, mobility.mean_completely_home_percentage, mobility.mean_full_time_work_percentage, mobility.mean_part_time_work_percentage FROM mobility INNER JOIN demographic ON mobility.cbg = demographic.cbg')
result.createOrReplaceTempView('demographic_mobility')
result.show()

In [None]:
# Count number of rows
spark.sql('SELECT COUNT(cbg) FROM demographic_mobility').show()

In [None]:
# sort by poverty
result = spark.sql('SELECT * FROM demographic_mobility ORDER BY CAST(poverty_percentage AS float) DESC')
result.createOrReplaceTempView('demographic_mobility')
result.show()

In [None]:
result_df = result.toPandas()
result_df

In [None]:
# plot bar graph
alt.Chart(result_df).mark_point().encode(
    alt.X('poverty_percentage:Q'),
    alt.Y('mean_completely_home_percentage:Q'),
    tooltip=['cbg','poverty_percentage','mean_completely_home_percentage']
).properties(width=400, height=400).interactive()

In [None]:
result_mean_df = result_df.groupby(pd.cut(result_df['poverty_percentage'].astype(float), np.arange(0,110, 10))).mean().reset_index()

In [None]:
result_mean_df['poverty_percentage'] = result_mean_df['poverty_percentage'].astype(str)

In [None]:
result_mean_df['color'] = ['#45a0d1' if x>30 else '#ff4833' for x in result_mean_df['mean_completely_home_percentage']]
result_mean_df

In [None]:
count = result_mean_df.shape[0]
print("{}".format(count))

In [None]:
viz_df = []
for col in ['mean_completely_home_percentage','mean_full_time_work_percentage','mean_part_time_work_percentage']:
        for c in range(0,count):
            viz_df.append({'poverty': c, 'column': col, 'value': result_mean_df[col][c]})
viz_df = alt.Data(values=viz_df)

In [None]:
#Visualizing data by mobility type

title = '{} {}'.format(city,'2020')
alt.Chart(viz_df, title=title).mark_line().encode(
    x=alt.X('poverty:O', title='poverty_percentage'),
    y=alt.Y('value:Q', type='quantitative', title='Percentage'),
    color='column:N'
).properties(
    width=300,
    height=250
).interactive()

In [None]:
#
alt.Chart(result_mean_df).mark_point().encode(
    alt.X('poverty_percentage:N'),
    alt.Y('mean_completely_home_percentage'),
    color = alt.Color('color',scale=None),
    tooltip=[alt.Tooltip('poverty_percentage'),
             alt.Tooltip('mean_completely_home_percentage')]
).properties(width=400, height=400).interactive()

In [None]:
result_df['poverty_percentage'].astype(float).dtype