In [1]:
# All required Imports
!pip install boto3
!pip install censusdata
!pip install tabulate
!pip install geopandas
!pip install plotly

import geopandas as gpd
import plotly.express as px
import boto3
import os
print(os.environ["JAVA_HOME"])
print(os.environ["SPARK_HOME"])
import findspark
findspark.init()
import pandas as pd

from geopandas import GeoDataFrame
from shapely.geometry import Point

from pyspark.sql import SparkSession

C:\Java\jdk-1.8
C:\Users\Admin\Spark


In [2]:
# Defining the Spark interface
spark = SparkSession.builder.config('spark.executor.cores', '8').config('spark.executor.memory', '6g')\
        .config("spark.sql.session.timeZone", "UTC").config('spark.driver.memory', '6g').master("local[26]")\
        .appName("final-project-app").config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC').config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')\
        .config("spark.sql.datetime.java8API.enabled", "true").config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .config("spark.sql.autoBroadcastJoinThreshold", -1)\
        .config("spark.driver.maxResultSize", 0)\
        .config("spark.shuffle.spill", "true")\
        .getOrCreate()

In [3]:
# Paths to the data
trips_data = '../data/mds-trips-bird.parquet'
socio_economic_fp = '../data/socio_economic/2021_census_tract_davidson.geojson'

Removing all extra information from trips data

In [4]:
# Reading the data into a pandas dataframe
trips_df = pd.read_parquet(trips_data)

In [5]:
#Required columns: 
# start_time, end_time, geometry, actual_cost, trip_distance, trip_id, vehicle_id, start_location(Only the starting location)

In [6]:
# Getting only the required columns
trips_df = trips_df[['start_time', 'end_time', 'geometry', 'actual_cost', 'trip_distance', 'trip_id', 'vehicle_id']]

In [7]:
# Getting all the starting locations for all the trips as the first poistion in the linestring
trips_df['start_location'] = trips_df.geometry.apply(lambda x: x['coordinates'][0])

In [8]:
# Renaming the columns and dropping additional columns
trips_df['route_coordinates'] = trips_df['geometry']
trips_df['geometry'] = trips_df['start_location']
trips_df = trips_df.drop('start_location',axis=1)
# Getting the latitude and longitude data from the starting location (renamed to geometry) 
trips_df['start_longitude'] = trips_df.geometry.apply(lambda x: x[0])
trips_df['start_latitude'] = trips_df.geometry.apply(lambda x: x[1])

In [9]:
# Creating the shapely point to create a geopandas dataframe
geometry = [Point(xy) for xy in zip(trips_df.start_longitude, trips_df.start_latitude)]
# Dropping the columns that are not required
trips_df = trips_df.drop(['start_longitude', 'start_latitude'], axis=1)
# Converting the pandas dataframe into a geopandas dataframe 
trips_gdf = GeoDataFrame(trips_df, crs="EPSG:4326", geometry=geometry)

Removing all extra information from socio economic data

In [10]:
# Reading the data into a geopandas dataframe
socio_economic_df = gpd.read_file(socio_economic_fp)

In [11]:
# Getting the county names 
new_indices = []
county_names = []
for index in socio_economic_df.NAME.tolist():
        county_name = index.split(',')[1].strip().split(' ')[0].strip()
        county_names.append(county_name)
socio_economic_df['county_name'] = county_name

In [12]:
# Redefining the index as GEOID
socio_economic_df.index = socio_economic_df.GEOID
# List of columns required
socio_economic_cols = ['median_income_last12months','geometry','county_name']
# Getting only the required columns
socio_economic_df = socio_economic_df.drop(columns = [c for c in socio_economic_df.columns if c not in socio_economic_cols])

In [13]:
# Writing the final socio economic dataframe to the required location
socio_economic_df.to_parquet('../data/final_socio_economic_data.parquet')

In [14]:
# Reseting the index to create a join
socio_economic_df = socio_economic_df.reset_index()

### Now creating the spatial join

In [15]:
# Checking if the coordinate reference system is consistent before joining the data 
trips_gdf = trips_gdf.to_crs(socio_economic_df.crs)

In [16]:
# Creating the spatial join between trip starting position and socio economic data to get the area names 
joined_df = gpd.sjoin(trips_gdf, socio_economic_df, op='within')

  if (await self.run_code(code, result,  async_=asy)):


In [17]:
# Analyzing the data to check if everything is correct
joined_df

Unnamed: 0,start_time,end_time,geometry,actual_cost,trip_distance,trip_id,vehicle_id,route_coordinates,index_right,GEOID,median_income_last12months,county_name
0,1566471760475,1566471905287,POINT (-86.79949 36.14624),187.0,16,7ed9d904-594b-43af-ae64-7b58b54f1912,21BWU,"{'coordinates': [[-86.7994921, 36.1462415], [-...",99,47037016400,50855.0,Davidson
9,1566473212444,1566473686684,POINT (-86.79653 36.13306),332.0,2063,3322a6e3-36a7-405f-800e-ee5289e02857,Y7HMS,"{'coordinates': [[-86.79653275299017, 36.13305...",99,47037016400,50855.0,Davidson
13,1566473705562,1566473994897,POINT (-86.79699 36.14114),245.0,38,ad7d0cdd-fe9a-4dbd-ab57-f6c53928c931,WRZPW,"{'coordinates': [[-86.79699490432981, 36.14114...",99,47037016400,50855.0,Davidson
16,1566474469566,1566474587343,POINT (-86.79928 36.14049),187.0,947,098c2c24-e67c-42b2-b7d7-46b70eb1073d,UT5SM,"{'coordinates': [[-86.79928335369266, 36.14049...",99,47037016400,50855.0,Davidson
19,1566474403946,1566474787458,POINT (-86.80004 36.14117),303.0,1481,0fb35241-2d4e-4a79-975a-e4004302d0a6,DX55L,"{'coordinates': [[-86.80004419226664, 36.14117...",99,47037016400,50855.0,Davidson
...,...,...,...,...,...,...,...,...,...,...,...,...
90654,1595187076766,1595187210476,POINT (-86.60319 36.07186),300.0,21,dfbb202d-ba0c-4149-bbf4-e71611a473f7,I4464,"{'coordinates': [[-86.60319013354608, 36.07186...",168,47037015620,54391.0,Davidson
91929,1595680836415,1595681479000,POINT (-86.78141 36.19726),685.0,16218,9ade1d5f-d1c9-4643-af2f-7a71f88c0697,L4VHE,"{'coordinates': [[-86.78140778016248, 36.19725...",118,47037012702,60432.0,Davidson
91938,1595681843471,1595683666000,POINT (-86.78098 36.19704),1504.0,12944,587a6bc6-d64f-4fa4-9416-ceb719b5e35c,R7KNI,"{'coordinates': [[-86.78097753753937, 36.19703...",118,47037012702,60432.0,Davidson
92671,1595852310000,1595853407000,POINT (-86.72293 36.06338),880.0,24322,7af86728-88ad-4811-b493-e4efd671339e,MR1WY,"{'coordinates': [[-86.722925, 36.063384], [-86...",142,47037018904,65023.0,Davidson


In [18]:
# Dropping columns that are not required
joined_df = joined_df.drop(['route_coordinates','index_right'],axis=1)
# Removing duplicate rows
joined_df.drop_duplicates(inplace=True)

In [19]:
# Creating a temporary data frame with only GEOID and geometry to create a join as geometry column was missing 
temp = socio_economic_df[['GEOID','geometry']]
temp['geometry_polygon'] = temp['geometry']
temp = temp[['GEOID','geometry_polygon']]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  super().__setitem__(key, value)


In [20]:
# Joining on GEOID and getting the geometry column from socio economic data
joined_df = joined_df.join(temp.set_index('GEOID'),on='GEOID',how='inner')

In [21]:
# Storing the data as a parquet file to the required location
joined_df.to_parquet('../data/final_joined_data.parquet')