In [None]:
#Import necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
from google.cloud import storage

#Authenticate User
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


In [None]:
#Install Java and Spark
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
# !tar xf spark-3.0.0-bin-hadoop3.2.tgz
# !pip install -q findspark

In [None]:
#Install PySpark
# !pip install pyspark

In [None]:
# Initialize a GCS client
storage_client = storage.Client()

# Define bucket and file paths
landing_folder_name = 'cleaned'
input_file_name = 'parquet nypd_mv_collisions'
output_folder_name = 'trusted'
output_file_name = 'tested nypd_mv_collisions'

# Read data from the landing folder file
landing_bucket = storage_client.bucket('my-project-bucket-bc')
landing_blob = landing_bucket.blob(f'{landing_folder_name}/{input_file_name}')
local_file_path = '/tmp/input_file.csv'
landing_blob.download_to_filename(local_file_path)

In [None]:
df = pd.read_parquet(local_file_path)

In [None]:
import os

os.environ["HADOOP_CONF_DIR"] = "/usr/local/lib/python3.10/dist-packages/pyspark"

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession with additional libraries for GCS support
spark = SparkSession.builder \
    .appName("NYPD Collisions Analysis") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.gs.project.id", "my-project-bucket-bc") \
    .config("spark.hadoop.fs.gs.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "cis-4130-semester-project-18c39c58bea4.json") \
    .config("spark.hadoop.fs.gs.auth.service.account.json.keyfile", "cis-4130-semester-project-18c39c58bea4.json") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [None]:
spark_df = spark.createDataFrame(df)

In [None]:
spark_df.show()


+---------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+--------------------+-------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+----------------------------+-------------------------+------------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  borough|contributing_factor_vehicle_1|contributing_factor_vehicle_2|contributing_factor_vehicle_3|contributing_factor_vehicle_4|contributing_factor_vehicle_5|   cross_street_name|          timestamp|number_of_cyclist_injured|number_of_cyclist_killed|number_of_motorist_injured|number_of_motorist_killed|number_of_pedestrians_injured|number_of_pedestrians_killed|number_of_persons_injured|number_of_persons_killed|unique_key|  vehicle_type_code1|  vehicle_type_code

In [None]:
spark_df.printSchema()

root
 |-- borough: string (nullable = true)
 |-- contributing_factor_vehicle_1: string (nullable = true)
 |-- contributing_factor_vehicle_2: string (nullable = true)
 |-- contributing_factor_vehicle_3: string (nullable = true)
 |-- contributing_factor_vehicle_4: string (nullable = true)
 |-- contributing_factor_vehicle_5: string (nullable = true)
 |-- cross_street_name: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- number_of_cyclist_injured: long (nullable = true)
 |-- number_of_cyclist_killed: long (nullable = true)
 |-- number_of_motorist_injured: long (nullable = true)
 |-- number_of_motorist_killed: long (nullable = true)
 |-- number_of_pedestrians_injured: long (nullable = true)
 |-- number_of_pedestrians_killed: long (nullable = true)
 |-- number_of_persons_injured: long (nullable = true)
 |-- number_of_persons_killed: long (nullable = true)
 |-- unique_key: long (nullable = true)
 |-- vehicle_type_code1: string (nullable = true)
 |-- vehicle_type_code2: 

In [None]:
from pyspark.sql.functions import to_timestamp

# Convert the timestamp column to an actual timestamp data type
spark_df = spark_df.withColumn('timestamp', to_timestamp(spark_df.timestamp, 'yyyy-MM-dd HH:mm:ss'))

spark_df.count()

4001

In [None]:
# Import some modules we will need later on
from pyspark.sql.functions import col, isnan, when, count, udf, to_date, year, month, date_format, size, split
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

In [None]:
spark_df = spark_df.withColumn("time_year", year("timestamp"))
spark_df = spark_df.withColumn("time_month", month("timestamp"))
spark_df = spark_df.withColumn("time_yearmonth", date_format("timestamp", "yyyy-MM"))
spark_df = spark_df.withColumn("time_dayofweek", date_format("timestamp", "E"))
spark_df = spark_df.withColumn("time_weekend", when(spark_df.time_dayofweek == 'Saturday', 1.0)
                               .when(spark_df.time_dayofweek == 'Sunday', 1.0)
                               .otherwise(0))
spark_df.printSchema()

root
 |-- borough: string (nullable = true)
 |-- contributing_factor_vehicle_1: string (nullable = true)
 |-- contributing_factor_vehicle_2: string (nullable = true)
 |-- contributing_factor_vehicle_3: string (nullable = true)
 |-- contributing_factor_vehicle_4: string (nullable = true)
 |-- contributing_factor_vehicle_5: string (nullable = true)
 |-- cross_street_name: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- number_of_cyclist_injured: long (nullable = true)
 |-- number_of_cyclist_killed: long (nullable = true)
 |-- number_of_motorist_injured: long (nullable = true)
 |-- number_of_motorist_killed: long (nullable = true)
 |-- number_of_pedestrians_injured: long (nullable = true)
 |-- number_of_pedestrians_killed: long (nullable = true)
 |-- number_of_persons_injured: long (nullable = true)
 |-- number_of_persons_killed: long (nullable = true)
 |-- unique_key: long (nullable = true)
 |-- vehicle_type_code1: string (nullable = true)
 |-- vehicle_type_code

In [None]:
columns_info = [(col_name, col_data_type) for col_name, col_data_type in spark_df.dtypes]
# Determine feature engineering treatments
feature_engineering_treatments = [(col_name, col_data_type, 'Indexer') if str(col_data_type) == 'string' else
                                  (col_name, col_data_type, 'Modeler') if str(col_data_type) in ['int', 'DoubleType'] else
                                  (col_name, col_data_type, 'Scaler') for col_name, col_data_type in columns_info]
# Create a new DataFrame to store the information
feature_info_df = spark.createDataFrame(feature_engineering_treatments, ["Column Name", "Column Type","Feature Engineering Treatment"])

# Show the DataFrame
feature_info_df.show()

# Write the DataFrame to a file on your local desktop
feature_info_df.write.csv('file:///C:/Users/bella/Downloads/feature_info.csv', header=True, mode='overwrite')

+--------------------+-----------+-----------------------------+
|         Column Name|Column Type|Feature Engineering Treatment|
+--------------------+-----------+-----------------------------+
|             borough|     string|                      Indexer|
|contributing_fact...|     string|                      Indexer|
|contributing_fact...|     string|                      Indexer|
|contributing_fact...|     string|                      Indexer|
|contributing_fact...|     string|                      Indexer|
|contributing_fact...|     string|                      Indexer|
|   cross_street_name|     string|                      Indexer|
|           timestamp|  timestamp|                       Scaler|
|number_of_cyclist...|     bigint|                       Scaler|
|number_of_cyclist...|     bigint|                       Scaler|
|number_of_motoris...|     bigint|                       Scaler|
|number_of_motoris...|     bigint|                       Scaler|
|number_of_pedestr...|   

In [None]:
# Get the schema of the DataFrame
schema = spark_df.schema

# Extract the data types from the schema
data_types = [field.dataType for field in schema]

# Convert the data types to string representations
data_type_strings = [str(data_type) for data_type in data_types]

# Get unique data type strings
unique_data_types = set(data_type_strings)

# Display the unique data types
print("Unique data types present in the DataFrame:")
for data_type in unique_data_types:
    print(data_type)

# Define function to determine variable type
def get_variable_type(col_data_type):
    if col_data_type == StringType():
        return 'Categorical'
    elif col_data_type == IntegerType() or col_data_type == DoubleType():
        return 'Continuous'
    elif col_data_type == BooleanType() or col_data_type == DateType():
        return 'Categorical'
    elif isinstance(col_data_type, ArrayType) and \
            (col_data_type.elementType == IntegerType() or col_data_type.elementType == DoubleType()):
        return 'Continuous'
    else:
        return 'Unknown'

# Get DataFrame columns info
columns_info = [(col_name, col_data_type) for col_name, col_data_type in zip(spark_df.columns, data_types)]

# Determine variable types
variable_types = [(col_name, get_variable_type(col_data_type)) for col_name, col_data_type in columns_info]

# Create a new DataFrame to store the information
variable_info_df = spark.createDataFrame(variable_types, ["Column Name", "Variable Type"])

# Show the DataFrame
variable_info_df.show()

# Check for any unknown types
unknown_types = variable_info_df.filter(variable_info_df["Variable Type"] == "Unknown")
if unknown_types.count() > 0:
    print("Columns with unknown types:")
    unknown_types.show()
    print("Data types of columns categorized as 'Unknown':")
    for row in unknown_types.collect():
        print(row["Column Name"], "-", row["Variable Type"])
else:
    print("All column types were successfully determined.")


Unique data types present in the DataFrame:
StringType()
TimestampType()
LongType()
DoubleType()
IntegerType()
+--------------------+-------------+
|         Column Name|Variable Type|
+--------------------+-------------+
|             borough|  Categorical|
|contributing_fact...|  Categorical|
|contributing_fact...|  Categorical|
|contributing_fact...|  Categorical|
|contributing_fact...|  Categorical|
|contributing_fact...|  Categorical|
|   cross_street_name|  Categorical|
|           timestamp|      Unknown|
|number_of_cyclist...|      Unknown|
|number_of_cyclist...|      Unknown|
|number_of_motoris...|      Unknown|
|number_of_motoris...|      Unknown|
|number_of_pedestr...|      Unknown|
|number_of_pedestr...|      Unknown|
|number_of_persons...|      Unknown|
|number_of_persons...|      Unknown|
|          unique_key|      Unknown|
|  vehicle_type_code1|  Categorical|
|  vehicle_type_code2|  Categorical|
| vehicle_type_code_3|  Categorical|
+--------------------+-------------+
o

In [None]:
from IPython.display import display
display(variable_info_df.toPandas())

Unnamed: 0,Column Name,Variable Type
0,borough,Categorical
1,contributing_factor_vehicle_1,Categorical
2,contributing_factor_vehicle_2,Categorical
3,contributing_factor_vehicle_3,Categorical
4,contributing_factor_vehicle_4,Categorical
5,contributing_factor_vehicle_5,Categorical
6,cross_street_name,Categorical
7,timestamp,Unknown
8,number_of_cyclist_injured,Unknown
9,number_of_cyclist_killed,Unknown


In [None]:
#To display all rows of the table in CoLab using IPython
from IPython.display import display
display(feature_info_df.toPandas())

Unnamed: 0,Column Name,Column Type,Feature Engineering Treatment
0,borough,string,Indexer
1,contributing_factor_vehicle_1,string,Indexer
2,contributing_factor_vehicle_2,string,Indexer
3,contributing_factor_vehicle_3,string,Indexer
4,contributing_factor_vehicle_4,string,Indexer
5,contributing_factor_vehicle_5,string,Indexer
6,cross_street_name,string,Indexer
7,timestamp,timestamp,Scaler
8,number_of_cyclist_injured,bigint,Scaler
9,number_of_cyclist_killed,bigint,Scaler


In [None]:
from pyspark.sql.functions import col, count
from scipy.stats import chi2_contingency

# Compute the contingency table
contingency_table = spark_df.crosstab('number_of_pedestrians_injured', 'number_of_persons_injured')

# Convert contingency table to Pandas DataFrame
contingency_table_pd = contingency_table.toPandas()

# Convert contingency table to numeric data types
contingency_table_pd = contingency_table_pd.apply(pd.to_numeric, errors='coerce')

# Drop rows with any NaN values
contingency_table_pd.dropna(inplace=True)

# Ensure all values are integers
contingency_table_pd = contingency_table_pd.astype(int)

# Perform chi-square test for independence
chi2_stat, p_val, dof, expected = chi2_contingency(contingency_table_pd.values)

# Create a DataFrame to hold the results
results_df = pd.DataFrame()

# Repeat test statistic, p-value, and degrees of freedom for each expected frequency
num_expected = len(expected.flatten())
test_statistic = [chi2_stat] * num_expected
p_value = [p_val] * num_expected
degrees_of_freedom = [dof] * num_expected

# Flatten the expected frequencies array
expected_flat = expected.flatten()

# Add the repeated values to the DataFrame
results_df["Test Statistic"] = test_statistic
results_df["P-value"] = p_value
results_df["Degrees of Freedom"] = degrees_of_freedom

# Add the expected frequencies to the DataFrame
results_df["Expected Frequencies"] = expected_flat

# Display the results DataFrame
display(results_df)


Unnamed: 0,Test Statistic,P-value,Degrees of Freedom,Expected Frequencies
0,3190.580704,0.0,90,0.043748
1,3190.580704,0.0,90,4.130251
2,3190.580704,0.0,90,1.845389
3,3190.580704,0.0,90,0.009943
4,3190.580704,0.0,90,0.001989
...,...,...,...,...
107,3190.580704,0.0,90,0.226696
108,3190.580704,0.0,90,0.098434
109,3190.580704,0.0,90,0.047726
110,3190.580704,0.0,90,0.008949


In [None]:
output_blob = landing_bucket.blob(f'{output_folder_name}/{output_file_name}')
output_blob.upload_from_filename(local_file_path)

print("Uploaded to Trusted")

Uploaded to Trusted
