<a href="https://colab.research.google.com/github/abhaykagalkar/Testbcg/blob/main/BCG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mount G-Drive to Colab

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:

# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [3]:
!pip install -q findspark


In [4]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 63 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=8b16f28a9cedb88fa984be278345b9f982a62f2b46c3f8c9d0e3bac04ad4fd23
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


**SET JAVA and Spark Env PATHS**

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


In [6]:

import findspark
findspark.init()

In [7]:

from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import *

In [8]:
spark = SparkSession.builder\
        .master("local")\
        .appName("BCG")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark

# Read all the CSVs required for Analysis

In [11]:
Charges=spark.read.options(header=True, inferSchema=True).csv("drive/MyDrive/DATA/BCG/Charges_use.csv")
Damages=spark.read.options(header=True, inferSchema=True).csv("drive/MyDrive/DATA/BCG/Damages_use.csv")
Endorse=spark.read.options(header=True, inferSchema=True).csv("drive/MyDrive/DATA/BCG/Endorse_use.csv")
PrimaryPerson=spark.read.options(header=True, inferSchema=True).csv("drive/MyDrive/DATA/BCG/Primary_Person_use.csv")
Restrict=spark.read.options(header=True, inferSchema=True).csv("drive/MyDrive/DATA/BCG/Restrict_use.csv")
Units=spark.read.options(header=True, inferSchema=True).csv("drive/MyDrive/DATA/BCG/Units_use.csv")

# Clean the Staged Files 

In [12]:

Charges=Charges.dropDuplicates()
Damages=Damages.dropDuplicates()
Endorse=Endorse.dropDuplicates()
PrimaryPerson=PrimaryPerson.dropDuplicates()
Restrict=Restrict.dropDuplicates()
Units=Units.dropDuplicates()

# Analysis 1

In [13]:
PrimaryPerson_1=PrimaryPerson.filter((col("death_cnt")>=1) & (lower("PRSN_GNDR_ID")=='male'))
Analysis_1=PrimaryPerson_1.select("Crash_ID").distinct()
Analysis_1.count()

180

# Analysis 2

In [15]:
Charges_2 = Charges.select("CRASH_ID","UNIT_NBR").distinct()
Analysis_2 = Units.join(Charges_2,on=["CRASH_ID","UNIT_NBR"],how="inner").filter(lower(col("VEH_BODY_STYL_ID")).isin(['motorcycle','police motorcycle']))
Analysis_2.count()

401

# Analysis 3

In [16]:
PrimaryPerson.filter(lower(col("PRSN_GNDR_ID")) == "female")\
  .groupBy("DRVR_LIC_STATE_ID")\
  .agg(count(lit(1)).alias("Count"))\
  .orderBy(col("Count").desc()).limit(1)\
  .select("DRVR_LIC_STATE_ID").show()


+-----------------+
|DRVR_LIC_STATE_ID|
+-----------------+
|            Texas|
+-----------------+



# Analysis 4

In [17]:
Units4=Units.dropDuplicates(["CRASH_ID","UNIT_NBR"])
Units4=Units4.withColumn("Injury_Death",col("TOT_INJRY_CNT")+col("DEATH_CNT")).groupby("VEH_MAKE_ID").sum("Injury_Death")
Units4=Units4.filter(~col("VEH_MAKE_ID").isin('NA'))
Analysis_4=Units4.withColumn("Filter",row_number().over(Window.orderBy(col("sum(Injury_Death)").desc())))
Analysis_4=Analysis_4.filter((col("Filter")>=5) & (col("Filter")<=15)).orderBy("Filter")
Analysis_4.select("VEH_MAKE_ID").show()

+-----------+
|VEH_MAKE_ID|
+-----------+
|     NISSAN|
|      HONDA|
|        GMC|
|    HYUNDAI|
|        KIA|
|       JEEP|
|   CHRYSLER|
|      MAZDA|
| VOLKSWAGEN|
|    PONTIAC|
|      LEXUS|
+-----------+



# Analysis 5

In [20]:
Units5 = Units.dropDuplicates(["CRASH_ID", "UNIT_NBR"])
Analysis_5 = (
    Units5.join(PrimaryPerson, on=["CRASH_ID", "UNIT_NBR"], how="inner")
    .groupBy("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID")
    .agg(count(lit(1)).alias("Count"))
    .withColumn(
        "Filter",
        row_number().over(
            Window.partitionBy("VEH_BODY_STYL_ID").orderBy(col("Count").desc())
        ),
    )
    .filter(col("Filter") == 1)
)
Analysis_5.select("VEH_BODY_STYL_ID", "PRSN_ETHNICITY_ID").show()


+--------------------+-----------------+
|    VEH_BODY_STYL_ID|PRSN_ETHNICITY_ID|
+--------------------+-----------------+
|                 BUS|            BLACK|
|                  NA|            WHITE|
|                 VAN|            WHITE|
|              PICKUP|            WHITE|
|SPORT UTILITY VEH...|            WHITE|
|PASSENGER CAR, 4-...|            WHITE|
|          FIRE TRUCK|            WHITE|
|               TRUCK|            WHITE|
|             UNKNOWN|          UNKNOWN|
|           AMBULANCE|            WHITE|
|    POLICE CAR/TRUCK|            WHITE|
|          MOTORCYCLE|            WHITE|
|   YELLOW SCHOOL BUS|            BLACK|
|   POLICE MOTORCYCLE|            WHITE|
|PASSENGER CAR, 2-...|            WHITE|
|       TRUCK TRACTOR|            WHITE|
|      FARM EQUIPMENT|            WHITE|
|NEV-NEIGHBORHOOD ...|            WHITE|
|OTHER  (EXPLAIN I...|            WHITE|
|        NOT REPORTED|            WHITE|
+--------------------+-----------------+



# Analysis 6

In [22]:
Units6 = Units.dropDuplicates(["CRASH_ID", "UNIT_NBR"])
Units6 = Units6.filter(lower(col("VEH_BODY_STYL_ID")).like("%car%"))
Units6 = Units6.filter(
    (lower(col("CONTRIB_FACTR_1_ID")).like("%alcohol%"))
    | (lower(col("CONTRIB_FACTR_2_ID")).like("%alcohol%"))
    | (lower(col("CONTRIB_FACTR_P1_ID")).like("%alcohol%"))
)
PrimaryPerson6 = PrimaryPerson.filter(col("DRVR_ZIP").isNotNull())
Analysis_6 = (
    Units6.join(PrimaryPerson6, on=["CRASH_ID", "UNIT_NBR"], how="inner")
    .groupBy("DRVR_ZIP")
    .agg(count(lit(1)).alias("Count"))
    .orderBy(col("Count").desc())
)
Analysis_6.select("DRVR_ZIP").show(5)


+--------+
|DRVR_ZIP|
+--------+
|   75052|
|   76010|
|   75067|
|   78521|
|   78130|
+--------+
only showing top 5 rows



# Analysis 7


In [27]:
insuranceTypeArr = [
    "PROOF OF LIABILITY INSURANCE",
    "INSURANCE BINDER",
    "CERTIFICATE OF SELF-INSURANCE",
]
vehicleDamageScalesArr = ["DAMAGED 5", "DAMAGED 6", "DAMAGED 7 HIGHEST"]
Units7 = Units.dropDuplicates(["CRASH_ID", "UNIT_NBR"])
Units7 = Units7.filter(lower(col("VEH_BODY_STYL_ID")).like("%car%"))
Units7 = Units7.filter(col("FIN_RESP_TYPE_ID").isin(*insuranceTypeArr))

Units7 = Units7.filter(
    (upper(col("VEH_DMAG_SCL_1_ID")).isin(*vehicleDamageScalesArr))
    | (upper(col("VEH_DMAG_SCL_2_ID")).isin(*vehicleDamageScalesArr))
)

Analysis_7 = Units7.join(Damages, on=["CRASH_ID"], how="left_anti")
Analysis_7 = Analysis_7.select("CRASH_ID").distinct()
Analysis_7.count()


3862

# Analysis 8

In [44]:
top25cities=['TX','MX','LA','NM','CA','FL','OK','AR','AZ','GA','IL','CO','MS','MO','TN','NC','KS','AL','OH','MI','NY','WA','VA','NV']
## licenced Drivers
PrimaryPerson8 = PrimaryPerson.filter(
    ~col("DRVR_LIC_CLS_ID").isin("NA", "UNKNOWN", "UNLICENSED")
)

## Drivers charged for Speeding
Charges8 = Charges.filter(lower(col("CHARGE")).like("%speed%"))
Charges8 = Charges8.dropDuplicates(["CRASH_ID", "UNIT_NBR"])


# top 10 Vehicle colours
Units8 = Units.dropDuplicates(["CRASH_ID", "UNIT_NBR"])
Units8 = Units8.groupBy("VEH_COLOR_ID").count().filter(~col("VEH_COLOR_ID").isin("NA"))
Analysis_8_2 = Units8.withColumn(
    "Filter", row_number().over(Window.orderBy(col("count").desc()))
)
Analysis_8_2 = Analysis_8_2.filter(col("Filter") <= 10)

# top 25 cities
Charges8_1 = Charges.dropDuplicates(["CRASH_ID", "UNIT_NBR"])
Analysis_8_1 = Charges8_1.join(
    PrimaryPerson,
    (PrimaryPerson.CRASH_ID == Charges8_1.CRASH_ID)
    & (PrimaryPerson.UNIT_NBR == Charges8_1.UNIT_NBR),
    "inner",
)

Analysis_8_1 = (
    Analysis_8_1.groupBy("DRVR_LIC_STATE_ID")
    .count()
    .filter(~col("DRVR_LIC_STATE_ID").isin("NA", "Unknown"))
)

Analysis_8_1 = Analysis_8_1.withColumn(
    "Filter", row_number().over(Window.orderBy(col("count").desc()))
)
Analysis_8_1 = Analysis_8_1.filter(col("Filter") <= 25)
Units8 = Units.dropDuplicates(["CRASH_ID", "UNIT_NBR"])

Analysis8 = (
    Units8.join(
        PrimaryPerson8,
        (Units8.CRASH_ID == PrimaryPerson8.CRASH_ID)
        & (Units8.UNIT_NBR == PrimaryPerson8.UNIT_NBR),
        "inner",
    )
    .join(
        Charges8,
        (Units8.CRASH_ID == Charges8.CRASH_ID) & (Units8.UNIT_NBR == Charges8.UNIT_NBR),
        "inner",
    )
    .join(Analysis_8_2, Analysis_8_2.VEH_COLOR_ID == Units8.VEH_COLOR_ID, "inner")
    .filter(col("VEH_LIC_STATE_ID").isin(top25cities))
)

Analysis8 = Analysis8.groupBy("VEH_MAKE_ID").agg(count(lit(1)).alias("Count"))
Analysis8 = Analysis8.withColumn(
    "Filter", row_number().over(Window.orderBy(col("Count").desc()))
)
Analysis8 = Analysis8.filter(col("Filter") <= 5).select("VEH_MAKE_ID").distinct()
Analysis8.show()


+-----------+
|VEH_MAKE_ID|
+-----------+
|       FORD|
|  CHEVROLET|
|     TOYOTA|
|      DODGE|
|      HONDA|
+-----------+

