In [22]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col, to_date, rank, lag, lead
from pyspark.sql.window import Window

In [2]:
spark = (SparkSession
    .builder
    .appName("Drug Data Analysis")
    .master("local[*]")
    .getOrCreate()
)
sc = spark.sparkContext

In [3]:
filename = "drug-enforcement.json"

In [4]:
!head -n 1000 drug-enforcement.json

[
    {
      "classification": "Class II",
      "center_classification_date": "20121025",
      "report_date": "20121031",
      "postal_code": "08816-2108",
      "termination_date": "20141007",
      "recall_initiation_date": "20120904",
      "recall_number": "D-026-2013",
      "city": "East Brunswick",
      "event_id": "63384",
      "distribution_pattern": "Nationwide",
      "openfda": {},
      "recalling_firm": "Raritan Pharmaceuticals, Inc.",
      "voluntary_mandated": "Voluntary: Firm Initiated",
      "state": "NJ",
      "reason_for_recall": "Microbial Contamination of Non-Sterile Products: Product is being recalled due to possible microbial contamination by C. difficile discovered in the raw material.",
      "initial_firm_notification": "E-Mail",
      "status": "Terminated",
      "product_type": "Drugs",
      "country": "United States",
      "product_description": "Wal-Mucil 100% Natural Fiber, 100% Natural Psyllium Seed Husk, Fiber Laxative/

In [5]:
drug_data = spark.read.json(filename,multiLine = True)

In [6]:
drug_data.printSchema()

root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- center_classification_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- code_info: string (nullable = true)
 |-- country: string (nullable = true)
 |-- distribution_pattern: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- initial_firm_notification: string (nullable = true)
 |-- more_code_info: string (nullable = true)
 |-- openfda: struct (nullable = true)
 |    |-- application_number: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- brand_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- generic_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- is_original_packager: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |    |-- manufacturer_name: array (nullab

In [9]:
drug_data = drug_data.withColumn('report_date', to_date(col('report_date'), 'yyyy-MM-dd'))


In [10]:
drug_data.printSchema() ## it can be infered that the report date column type has changed from type string into date

root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- center_classification_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- code_info: string (nullable = true)
 |-- country: string (nullable = true)
 |-- distribution_pattern: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- initial_firm_notification: string (nullable = true)
 |-- more_code_info: string (nullable = true)
 |-- openfda: struct (nullable = true)
 |    |-- application_number: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- brand_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- generic_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- is_original_packager: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |    |-- manufacturer_name: array (nullab

In [11]:
city_report_counts = drug_data.groupBy('city').count()

In [17]:
windowSpec = Window.orderBy(col('count').desc())
ranked_city_reports = city_report_counts.withColumn('city_rank', rank().over(windowSpec))


In [18]:
ranked_city_reports.show() ## showing the rank per city based on each report count 

+-----------+-----+---------+
|       city|count|city_rank|
+-----------+-----+---------+
|  Vancouver|  539|        1|
|  Asheville|  507|        2|
|    Toronto|  499|        3|
|Lake Forest|  458|        4|
|  Knoxville|  383|        5|
|      Ocala|  325|        6|
|    Newbern|  299|        7|
|    Houston|  241|        8|
|Little Rock|  217|        9|
|  Princeton|  150|       10|
|  Aquebogue|  140|       11|
| Boca Raton|  132|       12|
| Naperville|  118|       13|
|Noblesville|  117|       14|
|  Las Vegas|  115|       15|
|    Orlando|  115|       15|
|       Mesa|  107|       17|
| Birmingham|  105|       18|
|  Henderson|  105|       18|
| Moses Lake|  101|       20|
+-----------+-----+---------+
only showing top 20 rows



In [26]:
unique_values = drug_data.select('classification').distinct().collect()

In [30]:
unique_values ## here one need to see the unique values of the classification model, so one can filter over the Class III

[Row(classification='Class II'),
 Row(classification='Class I'),
 Row(classification='Not Yet Classified'),
 Row(classification='Class III')]

In [28]:
class_iii_counts = drug_data.filter(col('classification') == 'Class III') \
    .groupBy('city').count()

In [29]:
class_iii_counts.show()

+------------------+-----+
|              city|count|
+------------------+-----+
|            Corona|   17|
|        Charleston|    1|
|     Center Valley|    1|
|            Wilson|    1|
|       Cherry Hill|    1|
|         Levittown|    1|
|        Brookhaven|    5|
|      Chesterbrook|    1|
|      Cedar Knolls|    2|
|     North Chicago|    5|
|Whitehouse Station|    4|
|      Fort Collins|    1|
|     Central Islip|    3|
|              Kadi|    1|
|             Davie|    3|
|        Morgantown|   17|
|      Olive Branch|    2|
|            Dallas|    1|
|      Chesterfield|    1|
|           Peapack|    6|
+------------------+-----+
only showing top 20 rows



In [23]:
city_report_Date_Window = Window.partitionBy('city').orderBy('report_date')

drug_data_new = drug_data.withColumn('prev_report_date', lag('report_date').over(city_report_Date_Window)) \
    .withColumn('next_report_date', lead('report_date').over(city_report_Date_Window))




In [24]:
drug_data_new.show()

+--------------------+---------+--------------------------+---------+--------------+--------------------+-------------+--------------------+--------+-------------------------+--------------+--------------------+-----------+--------------------+----------------+------------+--------------------+----------------------+-------------+--------------------+-----------+-----+----------+----------------+--------------------+----------------+----------------+
|           address_1|address_2|center_classification_date|     city|classification|           code_info|      country|distribution_pattern|event_id|initial_firm_notification|more_code_info|             openfda|postal_code| product_description|product_quantity|product_type|   reason_for_recall|recall_initiation_date|recall_number|      recalling_firm|report_date|state|    status|termination_date|  voluntary_mandated|prev_report_date|next_report_date|
+--------------------+---------+--------------------------+---------+--------------+------

In [25]:
drug_data_new.printSchema() ## it can be shown that new columns were added named as prev_report_data & next_reporrt_date

root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- center_classification_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- code_info: string (nullable = true)
 |-- country: string (nullable = true)
 |-- distribution_pattern: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- initial_firm_notification: string (nullable = true)
 |-- more_code_info: string (nullable = true)
 |-- openfda: struct (nullable = true)
 |    |-- application_number: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- brand_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- generic_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- is_original_packager: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |    |-- manufacturer_name: array (nullab