# Parsing
### We chose to parse the dataset from Stanford as it has more information and characteristics than the one from Kaggle. The dataset from Kaggle is a subset of the one from Stanford, so we decided to use the original one.

##### load dataset

In [35]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName('DataFrame Optimization') \
    .getOrCreate()

# Load the DataFrame
df = spark.read.format('csv').option('header', 'true').load('../data/tx_statewide_2020_04_01-002_clean.csv')

# Show the DataFrame
df.show()

+--------------------+------------------+-------------------+----------------+------------+-----------+----------------+--------------+------------+-------------------+
|            location|               lat|                lng|     county_name|subject_race|subject_sex|search_conducted|search_vehicle|vehicle_year|          timestamp|
+--------------------+------------------+-------------------+----------------+------------+-----------+----------------+--------------+------------+-------------------+
|route: 0010, mile...|29.650999069213867| -97.51920318603516| Gonzales County|           4|          1|               0|           0.0|           0|2008-08-12 13:46:00|
|route: 0010, mile...| 29.65060043334961| -97.50606536865234| Gonzales County|           4|          1|               0|           0.0|           0|2008-08-12 13:46:00|
|route: 0271, mile...|33.733150482177734| -95.54741668701172|    Lamar County|           0|          1|               0|           0.0|           0|2008-08

In [36]:
df.count()

19752786

### Cleaning the dataset

##### Remove columns with name starting with "raw_" as they are not useful for our analysis

In [37]:
# drop columns with name starting with 'raw_'
df = df.drop(*[col for col in df.columns if col.startswith('raw_')])

# Drop lat and Lng nulls
df = df.dropna(subset=['lat', 'lng'])

df.show()

+--------------------+------------------+-------------------+----------------+------------+-----------+----------------+--------------+------------+-------------------+
|            location|               lat|                lng|     county_name|subject_race|subject_sex|search_conducted|search_vehicle|vehicle_year|          timestamp|
+--------------------+------------------+-------------------+----------------+------------+-----------+----------------+--------------+------------+-------------------+
|route: 0010, mile...|29.650999069213867| -97.51920318603516| Gonzales County|           4|          1|               0|           0.0|           0|2008-08-12 13:46:00|
|route: 0010, mile...| 29.65060043334961| -97.50606536865234| Gonzales County|           4|          1|               0|           0.0|           0|2008-08-12 13:46:00|
|route: 0271, mile...|33.733150482177734| -95.54741668701172|    Lamar County|           0|          1|               0|           0.0|           0|2008-08

In [38]:
# Show number of rows
df.count()

11600407

##### Remove columns with more than 50% of missing values

In [39]:
from pyspark.sql.functions import col, count, when, isnan, isnull

# Calculate the number of records in the DataFrame
total_records = df.count()

# Create a new DataFrame that counts the number of nulls, NaNs, or Nones in each column
null_counts = df.select([count(when((col(c) == 'NA') | (col(c) == 'na') | isnan(c) | isnull(c), c)).alias(c) for c in df.columns])

# Convert the DataFrame to a dictionary
null_counts_dict = {c: null_counts.first()[c] for c in null_counts.columns}

# Drop columns where more than 50% of the values are null
df = df.drop(*[c for c, null_count in null_counts_dict.items() if null_count / total_records > 0.5])

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\Theo4\AppData\Local\Programs\Python\Python312\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Theo4\AppData\Local\Programs\Python\Python312\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\Theo4\AppData\Local\Programs\Python\Python312\Lib\socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# # Print the null counts DataFrame
null_counts.show()

+--------+-------+-------+-----------+------------+-----------+----------------+--------------+------------+---------+
|location|    lat|    lng|county_name|subject_race|subject_sex|search_conducted|search_vehicle|vehicle_year|timestamp|
+--------+-------+-------+-----------+------------+-----------+----------------+--------------+------------+---------+
|      91|8152359|8152288|         99|           0|          0|            4385|        444674|           0|        0|
+--------+-------+-------+-----------+------------+-----------+----------------+--------------+------------+---------+



### remove useless columns

In [None]:
# drop column officer_id_hash
df = df.drop('officer_id_hash')
# drop column district
df = df.drop('district')
# drop column region
df = df.drop('region')
# drop column type
df = df.drop('type')
# drop column citation_issued (meaningless)
df = df.drop('citation_issued')
# drop column warning_issued (meaningless)
df = df.drop('warning_issued')

### DROP FOR NOW OPTMIZATIONS

# drop column outcome 
df = df.drop('outcome')
# drop column vehicle_make
df = df.drop('vehicle_make')
# drop column vehicle_model
df = df.drop('vehicle_model')
# drop column vehicle_type
df = df.drop('vehicle_type')
# drop column violation
# df = df.drop('violation')

In [None]:
# intermediary save
df.write.format('parquet').mode('overwrite').save('../data/tx_statewide_2020_04_01-002.parquet')

Py4JJavaError: An error occurred while calling o344.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


### OneHot Encoding for categorical columns

In [None]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName('DataFrame Optimization') \
    .getOrCreate()

df = spark.read.format('parquet').load('../data/tx_statewide_2020_04_01-002.parquet')

# drop column type
df = df.drop('type')
# drop column citation_issued (meaningless)
df = df.drop('citation_issued')
# drop column warning_issued (meaningless)
df = df.drop('warning_issued')

### DROP FOR NOW OPTMIZATIONS

# drop column outcome 
df = df.drop('outcome')
# drop column vehicle_make
df = df.drop('vehicle_make')
# drop column vehicle_model
df = df.drop('vehicle_model')
# drop column vehicle_type
df = df.drop('vehicle_type')
# drop column violation
# df = df.drop('violation')

In [None]:
# # show different values in type column
# df.select('search_conducted').distinct().show()

In [None]:
from pyspark.sql.functions import col, count, when, isnan, isnull
from pyspark.sql.functions import col, when

# make subject_sex 1 if 'male' and 0 if 'female'
df = df.withColumn("subject_sex", when(col("subject_sex") == "male", 1).otherwise(0).cast("integer"))

# make lat and long float
df = df.withColumn("lat", col("lat").cast("float"))
df = df.withColumn("lng", col("lng").cast("float"))

# make subject_race 0 if 'white', 1 if 'black', 2 if 'hispanic', 3 if 'asian', 4 if 'other' and make it an integer column
df = df.withColumn("subject_race", when(col("subject_race") == "white", 0)
                                   .when(col("subject_race") == "black", 1)
                                   .when(col("subject_race") == "hispanic", 2)
                                   .when(col("subject_race") == "asian", 3)
                                   .otherwise(4).cast("integer"))


# make search_vehicle 1 if TRUE and 0 if FALSE else NA
df = df.withColumn("search_vehicle", when(col("search_vehicle") == "TRUE", 1)
                                    .when(col("search_vehicle") == "FALSE", 0)
                                    .otherwise(None).cast("integer"))

# make vehicle_year an integer column and fill NA with 0
df = df.withColumn("vehicle_year", col("vehicle_year").cast("integer"))
df = df.withColumn("vehicle_year", when(col("vehicle_year").isNull(), 0).otherwise(col("vehicle_year")))
df = df.withColumn("vehicle_year", when(col("vehicle_year") < 1900, 0).otherwise(col("vehicle_year")))
df = df.withColumn("vehicle_year", when(col("vehicle_year") > 2022, 0).otherwise(col("vehicle_year")))

# date column is of format 'yyyy-mm-dd' and time column is of format 'hh:mm:ss': combine them into a single timestamp column
from pyspark.sql.functions import to_timestamp, concat_ws

df = df.withColumn("timestamp", to_timestamp(concat_ws(" ", col("date"), col("time")), "yyyy-MM-dd HH:mm:ss"))

# drop date and time columns
df = df.drop("date", "time")

# make search_conducted 1 if TRUE or citation and 0 if FALSE else NA and make it an integer column
from pyspark.sql.functions import col, when, to_timestamp, concat_ws

df = df.withColumn("search_conducted", when((col("search_conducted") == "TRUE") | (col("search_conducted") == "citation"), 1)
                                      .when(col("search_conducted") == "FALSE", 0)
                                      .otherwise(None).cast("integer"))

PARSING VIOLATION COLUMN

In [None]:
# Show all data in the violation column
# df.select('violation').distinct().show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `violation` cannot be resolved. Did you mean one of the following? [`location`, `lat`, `lng`, `timestamp`, `county_name`].;
'Project ['violation]
+- Relation [location#17,lat#18,lng#19,county_name#20,subject_race#21,subject_sex#22,search_conducted#23,search_vehicle#24,vehicle_year#25,timestamp#26] csv


In [None]:
# Violation column is a int column, but it has some NA values. Fill NA with 0

# If there is the word "speeding" in the violation column, set the violation to 1, else 0
df = df.withColumn("violation", when(col("violation").like("%speeding%"), 1).otherwise(0))
df = df.withColumn("violation", col("violation").cast("integer"))

In [None]:
df.dtypes

[('location', 'string'),
 ('lat', 'float'),
 ('lng', 'float'),
 ('county_name', 'string'),
 ('subject_race', 'int'),
 ('subject_sex', 'int'),
 ('search_conducted', 'int'),
 ('search_vehicle', 'int'),
 ('vehicle_year', 'int'),
 ('timestamp', 'timestamp')]

In [None]:
df.show()

+--------------------+---------+----------+----------------+------------+-----------+----------------+--------------+------------+-------------------+
|            location|      lat|       lng|     county_name|subject_race|subject_sex|search_conducted|search_vehicle|vehicle_year|          timestamp|
+--------------------+---------+----------+----------------+------------+-----------+----------------+--------------+------------+-------------------+
|route: 0059, mile...|     NULL|      NULL|     Cass County|           0|          1|               0|             0|        1997|2012-03-27 22:10:00|
|route: 0020, mile...|     NULL|      NULL|   Parker County|           0|          1|               0|             0|        2002|2012-03-27 22:11:00|
|route: 0044, mile...|33.958683|-98.529686|  Wichita County|           0|          1|               0|             0|        2000|2012-03-27 22:11:00|
|route: 1788, mile...|     NULL|      NULL|  Andrews County|           2|          1|         

In [None]:
# print length of dataframe
print(df.count())

19752786


In [None]:
# intermediary save
df.write.format('parquet').mode('overwrite').save('../data/tx_statewide_2020_04_01-002_clean.parquet')

24/05/03 22:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/03 22:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/05/03 22:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/05/03 22:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/05/03 22:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/05/03 22:50:10 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
24/05/03 22:50:10 WARN MemoryManager: Total allocation exceeds 95.

_____

In [None]:
pip install pyarrow>=4.0.0

zsh:1: 4.0.0 not found


Note: you may need to restart the kernel to use updated packages.


In [None]:
from pyspark.sql import SparkSession
import os
import pandas as pd
from functools import partial

# Initialize Spark session
spark = SparkSession.builder \
    .appName("parsing") \
    .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Read the parquet file
df = spark.read.format('parquet').load('../data/tx_statewide_2020_04_01-002_clean.parquet')

# remove the file if it exists
if os.path.exists('../data/tx_statewide_2020_04_01-002_clean.csv'):
    os.remove('../data/tx_statewide_2020_04_01-002_clean.csv')

# Function to append a Pandas DataFrame to a CSV file
def append_to_csv(pandas_df, filename, header=True, index=False):
    pandas_df.to_csv(filename, mode='a', header=header, index=index)

# Adjusted function to accept column names
def write_partition_to_csv(column_names, iterator):
    pandas_df = pd.DataFrame(list(iterator), columns=column_names)
    if not pandas_df.empty:
        append_to_csv(pandas_df, '../data/tx_statewide_2020_04_01-002_clean.csv', header=not os.path.exists('../data/tx_statewide_2020_04_01-002_clean.csv'), index=False)

# Capture column names outside the function
column_names = df.columns

# Use partial to pass column names along with the iterator
df.foreachPartition(partial(write_partition_to_csv, column_names))

                                                                                