In [1]:
import pandas as pd
import time 
from functools import reduce
start_time = time.time()

In [2]:
# drugs = pd.read_csv('arcos-ca-statewide-itemized.tsv',nrows = 1000000, sep = '\t')
# drugs = pd.read_csv('arcos-ca-statewide-itemized.tsv', sep = '\t')

# Connecting to S3

In [3]:
from pyspark import SparkContext, SparkConf
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.4" pyspark-shell'
!echo $JAVA_HOME
sc = SparkContext.getOrCreate()

# Your AWS S3 Info here
# with open("../../../Keys/aws.csv") as f:
#     data = f.read().splitlines()[1].split(",")
#access = data[2]
# sec = data[3]

# # YOUR AMAZON LOGIN INFORMATION HERE
# sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', f"{access}")

# sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', f"{sec}")

# FIX THIS S3 STUFF BEFORE RUNNING




## Working on sample of Drug Dataset to make a DataFrame

In [6]:
# Agg Sum - CALC_BASE_WT_IN_GM, QUANTITY
# Ingredient_Name has two options: HYDROCODONE BITARTRATE HEMIPENTAHYDRATE or OXYCODONE HYDROCHLORIDE

In [7]:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
#from pyspark.sql.functions import concat, col, lit, substring

ss = SparkSession.builder.getOrCreate()

## S3 Bucket
drug_rdd = sc.textFile('../arcos-ca-statewide-sample.tsv').map(lambda x: x.split('\t'))
# drug_rdd = sc.textFile('s3://data-systems-opioid/arcos-ca-statewide-itemized.tsv',24).map(lambda x: x.split('\t'))


def FloatSafe(value): # In case there are non-float type to be converted.
    try:
        return float(value)
    except ValueError:
        return None

def IntegerSafe(value): # In case there are non-integer type to be converted.
    try:
        return int(value)
    except ValueError:
        return None

# To reduce size, I remove the first 15 col's which are all unusable Identifiers
drug_rdd = drug_rdd.map(lambda x: x[16:25] + x[29:] + [x[11]]).persist()

# Columns I removed:
# -------------------------
# 0'REPORTER_DEA_NO',
#  'REPORTER_BUS_ACT',
#  'REPORTER_NAME',
#  'REPORTER_ADDL_CO_INFO',
#  'REPORTER_ADDRESS1',
#  'REPORTER_ADDRESS2',
#  'REPORTER_CITY',
#  'REPORTER_STATE',
#  'REPORTER_ZIP',
#  'REPORTER_COUNTY'
#10'BUYER_DEA_NO',
#12 'BUYER_NAME',
#  'BUYER_ADDL_CO_INFO',
#  'BUYER_ADDRESS1',
#15'BUYER_ADDRESS2'

# 25 UNIT (0.001% of rows have values)
# 26 Action Indicator
# 27 ORDER_FORM_NO
# 28 CORRECTION_NO

# Takes header row and makes column names
col_names = drug_rdd.first()

# Removes header col
drug_rdd = drug_rdd.filter(lambda x: x != col_names)

# Fixes variable type
drug_rdd = drug_rdd.map(lambda x: [x[0], x[1], x[2], x[3], x[4], IntegerSafe(x[5]), x[6], x[7], FloatSafe(x[8]), FloatSafe(x[9]), IntegerSafe(x[10]), FloatSafe(x[11]), FloatSafe(x[12]), IntegerSafe(x[13]),x[14], x[15], x[16], FloatSafe(x[17]), x[18], x[19], x[20], FloatSafe(x[21]), x[22]])


In [8]:
# To DataFrame
drug_df = drug_rdd.toDF(col_names)

# Set up for the ZIP-YEAR join
drug_df = drug_df.withColumn('Year', substring('TRANSACTION_DATE', -4,4))
drug_df = drug_df.withColumn('ZIP-YEAR', concat(col("BUYER_ZIP"), lit("-"), col("Year")))
drug_df.select("BUYER_ZIP","Year",'ZIP-YEAR').show(10)

+---------+----+----------+
|BUYER_ZIP|Year|  ZIP-YEAR|
+---------+----+----------+
|    93003|2007|93003-2007|
|    92649|2006|92649-2006|
|    92653|2006|92653-2006|
|    92113|2006|92113-2006|
|    92113|2006|92113-2006|
|    91301|2007|91301-2007|
|    92584|2007|92584-2007|
|    93402|2012|93402-2012|
|    93065|2012|93065-2012|
|    95536|2011|95536-2011|
+---------+----+----------+
only showing top 10 rows



### Drug Dataset feature engineering

In [9]:
# Becuase we aggregate everything, we may not have to worry about removing nulls

#converting strings to numeric values
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel, it knows what the unique values are
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

drug_df = indexStringColumns(drug_df, ["BUYER_COUNTY"])

In [10]:
from pyspark.sql.functions import udf,col
# We wrote our own method to One Hot Encode in a more PANDAS way
# This will also allow us to aggregate easier
def ourOneHotEncoder(df, col_name):
    categories = df.select(col_name).distinct().rdd.flatMap(lambda x : x).collect()
    categories.sort()
    for category in categories:
        function = udf(lambda item: 1 if item == category else 0, IntegerType())
        new_column_name = col_name+'_'+ category
        df = df.withColumn(new_column_name, function(col(col_name)))
    return df

drug_df = ourOneHotEncoder(drug_df, "BUYER_BUS_ACT")
drug_df = ourOneHotEncoder(drug_df, "DRUG_NAME")

# Zip Area DataFrame

In [11]:
pop_sq_mile = sc.textFile('s3://data-systems-opioid/ZCTA_area.csv', 24).map(lambda x: x.split(','))
col_names = pop_sq_mile.take(1)[0]
pop_df = pop_sq_mile.filter(lambda x: x != col_names).toDF(col_names)
pop_df = pop_df.select("ZIPcode", "Area_ac", "Area_sqmi")
pop_df = pop_df.withColumnRenamed("ZIPcode", "ZIP Code")
pop_df.show(10)

+--------+-----------+-----------+
|ZIP Code|    Area_ac|  Area_sqmi|
+--------+-----------+-----------+
|   21914|478.8578796|0.748215437|
|    1001|7877.861328|12.30915833|
|   34736|98956.84375|154.6200714|
|   46151|133556.1875|208.6815491|
|   48039|15943.14258|24.91115952|
|    1521|8370.506836|13.07891655|
|   49892|106387.2031|166.2299957|
|   70639|44236.84375|69.12007141|
|   56755|30904.10352|48.28766251|
|   64723|29107.58594|45.48060226|
+--------+-----------+-----------+
only showing top 10 rows



# Suicide DataFrame

In [12]:
death_rdd = sc.textFile('s3://data-systems-opioid/CA_suicides.csv',24).map(lambda x: x.split(','))

# Takes header row and makes column names
col_names = death_rdd.first()

# Removes header col
death_rdd = death_rdd.filter(lambda x: x != col_names)

# Fix RDD
death_rdd = death_rdd.map(lambda x: [x[0], x[1], x[2], IntegerSafe(x[3]), IntegerSafe(x[4])])

# To SQL DataFrame
death_df = death_rdd.toDF(col_names)
death_df = death_df.withColumn('SUI_per_thousand', death_df['Count']/death_df['Population_2018'] * 1000)

# Set Up for the ZIP-YEAR merg
death_df = death_df.withColumn('ZIP-YEAR', concat(col("ZIP Code"), lit("-"), col("Year")))

In [13]:
# Join the Zip Code area dataset to this dataset
death_df = death_df.join(pop_df, "ZIP Code", "leftouter")
death_df.show(10)

+--------+----+---------------+-----+---------------+--------------------+----------+-----------+-----------+
|ZIP Code|Year|Causes of Death|Count|Population_2018|    SUI_per_thousand|  ZIP-YEAR|    Area_ac|  Area_sqmi|
+--------+----+---------------+-----+---------------+--------------------+----------+-----------+-----------+
|   90022|1999|            SUI|    3|          67446| 0.04448002846721822|90022-1999|2806.332031|4.384893894|
|   90022|2000|            SUI|    2|          67446|0.029653352311478816|90022-2000|2806.332031|4.384893894|
|   90022|2001|            SUI|    2|          67446|0.029653352311478816|90022-2001|2806.332031|4.384893894|
|   90022|2002|            SUI|    4|          67446| 0.05930670462295763|90022-2002|2806.332031|4.384893894|
|   90022|2003|            SUI|    7|          67446| 0.10378673309017585|90022-2003|2806.332031|4.384893894|
|   90022|2004|            SUI|    4|          67446| 0.05930670462295763|90022-2004|2806.332031|4.384893894|
|   90022|

### Calculate the Population Per Square Mile

In [14]:
death_df = death_df.withColumn("Pop_per_sqmi", death_df['Population_2018'] / death_df['Area_sqmi'])

## Joining drug_df and death_df

In [15]:
# Change this join based on what flags we add
drug_agg_df = drug_df.groupBy('ZIP-YEAR').agg(
    min('BUYER_COUNTY'), # Should only be one per ZIP
    count('BUYER_CITY'),
    sum('STRENGTH'),
    sum('QUANTITY'),
    sum('CALC_BASE_WT_IN_GM'),
    sum('dos_str'),
    avg('DOSAGE_UNIT'),
    sum('DRUG_NAME_HYDROCODONE'),
    sum('DRUG_NAME_OXYCODONE'),
    sum('BUYER_BUS_ACT_CHAIN PHARMACY'),
    sum('BUYER_BUS_ACT_PRACTITIONER'),
    sum('BUYER_BUS_ACT_PRACTITIONER-DW/100'),
    sum('BUYER_BUS_ACT_PRACTITIONER-DW/275'),
    sum('BUYER_BUS_ACT_PRACTITIONER-DW/30'),
    sum('BUYER_BUS_ACT_RETAIL PHARMACY')
)

# drug_agg_df.show(5)

+----------+-----------------+-----------------+-------------+-------------+-----------------------+------------+------------------+--------------------------+------------------------+---------------------------------+-------------------------------+--------------------------------------+--------------------------------------+-------------------------------------+----------------------------------+
|  ZIP-YEAR|min(BUYER_COUNTY)|count(BUYER_CITY)|sum(STRENGTH)|sum(QUANTITY)|sum(CALC_BASE_WT_IN_GM)|sum(dos_str)|  avg(DOSAGE_UNIT)|sum(DRUG_NAME_HYDROCODONE)|sum(DRUG_NAME_OXYCODONE)|sum(BUYER_BUS_ACT_CHAIN PHARMACY)|sum(BUYER_BUS_ACT_PRACTITIONER)|sum(BUYER_BUS_ACT_PRACTITIONER-DW/100)|sum(BUYER_BUS_ACT_PRACTITIONER-DW/275)|sum(BUYER_BUS_ACT_PRACTITIONER-DW/30)|sum(BUYER_BUS_ACT_RETAIL PHARMACY)|
+----------+-----------------+-----------------+-------------+-------------+-----------------------+------------+------------------+--------------------------+------------------------+------------

### How to aggregate each column
- BUYER_COUNTY - min (later string encode)
- BUYER_BUS_ACT - one hot encode sum and divide by zip's count()
- DRUG_NAME	(after encoded) - sum
- count(BUYER_CITY) - do we even need this?
- sum(STRENGTH)
- sum(QUANTITY)
- sum(CALC_BASE_WT_IN_GM)
- sum(dos_str)
- Count - total count
- average (DOSAGE_UNIT)

In [16]:
death_join_df = death_df.select('ZIP-YEAR','Count','SUI_per_thousand', "Pop_per_sqmi")
# Becasue this is our response variable, we must inner join
drug_death_df = drug_agg_df.join(death_join_df, 'ZIP-YEAR', 'inner')
drug_death_df.show(10)

+----------+-----------------+-----------------+-------------+-------------+-----------------------+------------------+------------------+--------------------------+------------------------+---------------------------------+-------------------------------+--------------------------------------+--------------------------------------+-------------------------------------+----------------------------------+-----+--------------------+------------------+
|  ZIP-YEAR|min(BUYER_COUNTY)|count(BUYER_CITY)|sum(STRENGTH)|sum(QUANTITY)|sum(CALC_BASE_WT_IN_GM)|      sum(dos_str)|  avg(DOSAGE_UNIT)|sum(DRUG_NAME_HYDROCODONE)|sum(DRUG_NAME_OXYCODONE)|sum(BUYER_BUS_ACT_CHAIN PHARMACY)|sum(BUYER_BUS_ACT_PRACTITIONER)|sum(BUYER_BUS_ACT_PRACTITIONER-DW/100)|sum(BUYER_BUS_ACT_PRACTITIONER-DW/275)|sum(BUYER_BUS_ACT_PRACTITIONER-DW/30)|sum(BUYER_BUS_ACT_RETAIL PHARMACY)|Count|    SUI_per_thousand|      Pop_per_sqmi|
+----------+-----------------+-----------------+-------------+-------------+----------------

## Rename all of the columns in the dataframe

In [17]:
new_names = ['ZIP_YEAR',
 'min_BUYER_COUNTY',
 'BUYER_CITY_count',
 'STRENGTH_sum',
 'QUANTITY_sum',
 'CALC_BASE_WT_IN_GM_sum',
 'dos_str_sum',
 'DOSAGE_UNIT_avg',
 'DRUG_NAME_HYDROCODONE_sum',
 'DRUG_NAME_OXYCODONE_sum',
 'BUYER_BUS_ACT_CHAIN_PHARMACY_sum',
 'BUYER_BUS_ACT_PRACTITIONER_sum',
 'BUYER_BUS_ACT_PRACTITIONER_DW_100_sum',
 'BUYER_BUS_ACT_PRACTITIONER_DW_275_sum',
 'BUYER_BUS_ACT_PRACTITIONER_DW_30_sum',
 'BUYER_BUS_ACT_RETAIL_PHARMACY_sum',
 'Count',
 'SUI_per_thousand',
 'Pop_per_sqmi']

old_names = drug_death_df.schema.names

In [18]:
drug_death_df = reduce(lambda data, i: data.withColumnRenamed(old_names[i], new_names[i]), range(len(old_names)), drug_death_df)

# IRS Data

In [19]:
irs_rdd = sc.textFile('s3://data-systems-opioid/IRS_Tax_Data_06_to_12.tsv').map(lambda x: x.split('\t'))

In [20]:
def toInteger(value):
    return int(float(value))

# Takes header row and makes column names
col_names = irs_rdd.first()

# Removes header col
irs_rdd = irs_rdd.filter(lambda x: x != col_names)

# Fix RDD
irs_rdd = irs_rdd.map(lambda x: [toInteger(x[0])] + x[1:223] + [toInteger(x[223])]).persist()

# To SQL DataFrame
irs_df = irs_rdd.toDF(col_names)
irs_df = irs_df.withColumn('ZIP_YEAR', concat(col("ZipCode"), lit("-"), col("Year")))

# Join the IRS Data to the drug_death dataset

In [21]:
drug_death_irs = drug_death_df.join(irs_df, "ZIP_YEAR", "leftouter")

# Fill NULLS

In [22]:
# Fills the na's with the mean of the columns
mean_dict = {col: 'mean' for col in drug_death_irs.columns[1:]}
col_avgs =drug_death_irs.agg(mean_dict).collect()[0].asDict()
col_avgs = {k[4:-1]: v for k,v in col_avgs.items()}
drug_death_irs = drug_death_irs.fillna( col_avgs )

# Write to Parquet

In [23]:
# Parquet files have certain restricted characters
chars_to_remove = ('$',' ','-',',')
cols_before = drug_death_irs.columns
for char in chars_to_remove:
    for i in range(len(cols_before)):
        cols_before[i] = cols_before[i].replace(char, '_')

drug_death_irs = reduce(lambda data, i: data.withColumnRenamed(drug_death_irs.columns[i], cols_before[i]), range(len(drug_death_irs.columns)), drug_death_irs)


In [24]:
drug_death_irs.write.option("path", "./drug_death").saveAsTable('drug_death')

In [25]:
print("--- %s seconds ---" % (time.time() - start_time))

--- 188.3981957435608 seconds ---


# Read from Parquet

In [26]:
ss.sql("select * from parquet.`./drug_death`").show()

+----------+----------------+----------------+------------------+------------+----------------------+-----------+------------------+-------------------------+-----------------------+--------------------------------+------------------------------+-------------------------------------+-------------------------------------+------------------------------------+---------------------------------+-----+--------------------+------------------+-------+--------------------------+---------------------------------+--------------------------+-------------------------------+-------------------------------+--------------------------------+--------------------------------+---------------------------------------+--------------------------------+-------------------------------------+-------------------------------------+--------------------------------------+------------------------------------------+-------------------------------------------------+------------------------------------------+-----------

In [27]:
sc.stop()