# Pulitzer Insights

# 1.0 Objective:
The goal is to find a relationship between Pulitzer to GDP, Crime, and Population. The dataset is provided by the various organization as listed in the Source and Credits section. 

I would also like to check if:
Q1. Newspaper with the maximum number of Pulitzer prices? 
Q2. What are the top 5 states
Q3. To find if there is any Correlation between Crime, GDP, and Population on Pulitzer? For example - higher GDP means more prices (confounding parameter could be more journalists) or crime-prone cities incubate investigative journalism resulting in more Pulitzer.
Q4. If there is any correlation between daily circulation and crime rate?
Q5. To find socioeconomic factors affecting Pulitzer.

# 2.0 Data Source and Credits:
Raw Data Source and Credits:
Pulitzer Data is available FiveThirtyEight GIT HUB site <br>
https://github.com/fivethirtyeight/data/blob/master/pulitzer/pulitzer-circulation-data.csv 


Crime Data by State and US: <br>
http://www.usa.com/rank/us--crime-index--state-rank.htm <br>
https://ucr.fbi.gov/crime-in-the-u.s/2014/crime-in-the-u.s.-2014/tables/table-1 <br>

GDP for US and States: <br>
https://www.usgovernmentrevenue.com/download_multi_year_2000_2014USb_19c1li101mcn_F1cF0t <br>

Population by State and US: <br>
https://www.census.gov/data/datasets/2016/demo/popest/state-total.html

# 3.0 Prerequisites and Environment:
1. Jupyter server running in Google Cloud Platform ("GCP")
2. Spark slaves running on GCP
3. Cassandra cluster running on GCP
4. Spark master and jupyter server running on same VM.

**Please see my blog for detail setup.**

# 4.0 Approach:
The solution is specifically designed to achieve a controlled and structured approach to minimize data quality, relationship and format issues that may be present. <br>
The solution is sub-divided into three phases: <br>

### 4.1 Data Assembly - Phase I:
This phase of the project is designed to gather and do basic cleanup like join, merge, add or update attributes.

### 4.2 Data Preparation - Phase II: 
This phase of the project is designed to merge data collected from a persistent and reproducible source. At this stage, the data is clean and identifiable but has not gone thru any required transformation.

### 4.3 Analysis and Discovery - Phase III: 
This phase of the project is designed to validate and explore the dataset for all the problems listed in the “Objective” section of this notebook.

Please note that the above steps are NOT sequential. I plan to take an iterative approach to improve the dataset accuracy.

# 4.1 Data Preparation - Phase I

### 4.1.1 Gathering Pulitzer Data

FiveThirtyEight.com is the source of the Pulitzer raw data("Pulitzer"). US State, GDP, population or crime data are not mapped to Pulitzer. To map these other dimensions we 1st need to assign the raw Pulitzer data with a US State. This activity is done manually based on the name of the newspaper.

Step 1: Adding US state mapping to newspaper data. This step is done manually due to lack of common source, i.e., mapping newspaper to state not available. <br>
Step 2: Upload the file to GCP bucket so that it can be accessed from the Jupyter server which is also running in GCP. <br>
Step 2(Optional): Please check my blog for how to setup Jupyter, Spark and Cassandra cluster in GCP. <br>

### Assumption: For national or international newspaper I have used the HO state as the state.

### 4.1.1.1 Read and Prepare Pulitzer Data

In [2]:
import pandas as pd
from google.cloud import storage

import datetime as dt
from datetime import datetime
from pytz import timezone

import uuid

#Reading Google Buckets for files
client = storage.Client()
bucket=client.get_bucket('capstone_project_sr')
blob = storage.Blob('pulitzer.csv',bucket)
with open('pulitzer.csv', 'wb') as file_obj:
    blob.download_to_file(file_obj)
df=pd.read_csv('pulitzer.csv',sep=',',header=0, \
               names=['Newspaper','state','DailyCirculation_2004',\
               'DailyCirculation_2013',\
               'ChangeInDailyCirculation_2004_2013',\
               'WinnersAndFinalists_1990_2003',\
               'WinnersAndFinalists_2004_2014',\
               'WinnersAndFinalists_1990_2014'])

Adding basic data audit fields just in case we run into data conflicts in later phases of the project.

In [3]:
tz = timezone('EST') # adding time zone info
datetime.now(tz) 
df['Entrydate'] = dt.datetime.now()

df.insert(0,'Id',uuid.uuid4()) 
df.Id= df.Id.apply(lambda x: uuid.uuid4()) # adding unique identifier

In [4]:
df.info() # checking the data frame structure to make type change if required

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50 entries, 0 to 49
Data columns (total 10 columns):
Id                                    50 non-null object
Newspaper                             50 non-null object
state                                 50 non-null object
DailyCirculation_2004                 50 non-null object
DailyCirculation_2013                 50 non-null object
ChangeInDailyCirculation_2004_2013    50 non-null object
WinnersAndFinalists_1990_2003         50 non-null int64
WinnersAndFinalists_2004_2014         50 non-null int64
WinnersAndFinalists_1990_2014         50 non-null int64
Entrydate                             50 non-null datetime64[ns]
dtypes: datetime64[ns](1), int64(3), object(6)
memory usage: 4.0+ KB


In [6]:
df.sample(3) #number are stored with commas which we need to replace before stat analysis.

Unnamed: 0,Id,Newspaper,state,DailyCirculation_2004,DailyCirculation_2013,ChangeInDailyCirculation_2004_2013,WinnersAndFinalists_1990_2003,WinnersAndFinalists_2004_2014,WinnersAndFinalists_1990_2014,Entrydate
21,c04ef147-c92a-4f36-a461-78f60a3030d3,Cleveland Plain Dealer,Ohio,367528,311605,-15%,4,7,11,2017-11-18 15:48:36.202228
48,57a414ac-1d58-4641-bc75-a1b59fe43c86,Louisville Courier-Journal,Kentucky,216934,131208,-40%,0,3,3,2017-11-18 15:48:36.202228
26,ca563ebc-8d9a-4a84-b829-e308b5677066,Oregonian,Oregon,339169,228909,-33%,9,8,17,2017-11-18 15:48:36.202228


### 4.1.1.2 Inserting Pulitzer Data into Cassandra
The step is done for persistence and reliability among other benefits using a database cluster.

Reading the connection points details.

In [67]:
df_con=pd.read_csv('~/connection_point.csv',header=0) # this is done to make add basic level of security. 
#Please note that this file is not uploaded. It is only present in the Jupyter server. 

In [68]:
import itertools
from multiprocessing import Pool
import sys
import time
from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import tuple_factory
from cassandra.auth import PlainTextAuthProvider

def _insertData(params):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = tuple_factory
    prepared=session.prepare("INSERT INTO capstone.pulitzer \
                             (id,Newspaper,state,DailyCirculation_2004,DailyCirculation_2013, \
                             ChangeDailyCirculation_2004_2013,WinNFinalists_1990_2003, \
                             WinNFinalists_2004_2014,WinNFinalists_1990_2014,Entrydate) \
                             VALUES (?,?,?,?,?,?,?,?,?,?)")
    
    #using datastax driver for multiprocessing 
    execute_concurrent_with_args(session, prepared, params, concurrency=50) 
    return None

def multiprocess(params):
    pool = Pool(processes=4)
    results = [pool.map(_insertData, (params[n:n+100],)) for n in range(0, len(params),100)]
    return results
    

if __name__ == "__main__":
    parameters=[]
    for index, row in enumerate(df.values):        
        (a,b,c,d,e,f,g,h,i,j) = row
        row1=(a,str(b),str(c),str(d),str(e),str(f),str(g),str(h),str(i),j)
        parameters.append(row1)           
    a = multiprocess(parameters)

### 4.1.2 Gathering US State and Country GDP
GDP data is one file per state or country for example "usgs_1957_2015-1.csv", this file holds NJ specific GDP starting from 1957 to 2015. I have downloaded 50 + 1 data files from the site and stored it in a local data folder. There are 50(states) + 1 for US files in the data directory. <br>

### 4.1.2.1 Read and Prepare GDP Data

Step 1: Loading 51 files in a list for bulk processing. <br>
Step 2: Handling file format issues while reading i.e. skipping few top lines, bad lines(engine = python) and selecting the required columns. <br>
Step 4: Changing the columns to sync with the Pulitzer data frame.<br>
Step 3: Merging the data frame vertically. Merging may throw few warnings which can be ignored. <br>

In [2]:
import os
import pandas as pd
files = [file for file in os.listdir( './Data' ) \
         if file.startswith("usgs_1957_2015")]
gdp_merged=pd.DataFrame()

for file_ in files:
    filename='./Data/'+file_
    df = pd.read_csv(filename,skiprows=0,header=1,skipfooter=6,quoting=3,error_bad_lines=False, engine='python', usecols=range(0,3))
    df['State']=df.columns[2].split('-')[1][:2]
    del df[df.columns[2]]
    df.columns=['year','GDP-billion','state']
    gdp_merged=pd.concat([gdp_merged,df],ignore_index=1,axis=0)

Adding basic data audit fields just in case we run into data conflicts in later phases of the project.

In [74]:
tz = timezone('EST') # adding time zone info
datetime.now(tz) 
gdp_merged['Entrydate'] = dt.datetime.now()

gdp_merged.insert(0,'Id',uuid.uuid4()) 
gdp_merged.Id= gdp_merged.Id.apply(lambda x: uuid.uuid4()) # adding unique identifier

### 4.1.2.2 Inserting GDP data to Cassandra

In [75]:
def _insertData(params):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = tuple_factory
    prepared=session.prepare("INSERT INTO capstone.GDP(id,year,GDP,state,entrydate) VALUES (?,?,?,?,?)")
    
    #using datastax driver for multiprocessing 
    execute_concurrent_with_args(session, prepared, params, concurrency=50) 
    return None

def multiprocess(params):
    pool = Pool(processes=4)
    results = [pool.map(_insertData, (params[n:n+100],)) for n in range(0, len(params),100)]
    return results
    

if __name__ == "__main__":
    parameters=[]
    for index, row in enumerate(gdp_merged.values):        
        (a,b,c,d,e) = row
        row1=(a,str(b),int(c),str(d),i)
        parameters.append(row1)           
    a = multiprocess(parameters)

### 4.1.3 Gathering Crime Data
Crime data are collected from two different sources thus we need to merge it at the end. I have only considered crime index and violent crime for analysis.


### 4.1.3.1 Preparing Crime Index Data
Step 1: Accessing GCP bucket for the crime excel files. <br>
Step 2: Adding audit fields. <br>
Step 3: Spliting dataframe column to get year and state. <br>

In [44]:
blob = storage.Blob('CrimeIndex.xlsx',bucket)
with open('Crimeindex.xlsx', 'wb') as file_obj:
    blob.download_to_file(file_obj)
    
xl=pd.ExcelFile('Crimeindex.xlsx')
df_crime=xl.parse('Sheet1',header=0)

In [46]:
tz = timezone('EST') # adding time zone info
datetime.now(tz) 
df_crime['Entrydate'] = dt.datetime.now()

df_crime.insert(0,'Id',uuid.uuid4()) 
df_crime.Id= df_crime.Id.apply(lambda x: uuid.uuid4()) # adding unique identifier

In [57]:
df_crime.columns=['Id','rank','crimeindex','state-population','Entrydate']

In [83]:
df_crime.insert(4,'state','')
df_crime['state']=df_crime['state-population'].apply(lambda x: x.split('/')[0][:-1])

In [85]:
df_crime.insert(5,'population','')
df_crime['population']=df_crime['state-population'].apply(lambda x: x.split('/')[1].strip())

In [88]:
del df_crime[df_crime.columns[3]]

In [3]:
#df_crime.sample(3) #['state-population'].values[0].split('/')[1].strip()

### 4.1.3.2 Inserting Crime Index Data into Cassandra

In [100]:
def _insertData(params):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = tuple_factory
    prepared=session.prepare("INSERT INTO capstone.crimeindex(id,rank,\
                              crimeindex,state,population,entrydate)\
                              VALUES (?,?,?,?,?,?)")
    
    #using datastax driver for multiprocessing 
    execute_concurrent_with_args(session, prepared, params, concurrency=50) 
    return None

def multiprocess(params):
    pool = Pool(processes=4)
    results = [pool.map(_insertData, (params[n:n+100],)) for n in range(0, len(params),100)]
    return results
    

if __name__ == "__main__":
    parameters=[]
    for index, row in enumerate(df_crime.values):        
        (a,b,c,d,e,f) = row
        row1=(a,int(b),int(c),str(d),int(e.replace(',','')),f)
        parameters.append(row1)           
    a = multiprocess(parameters)

### 4.1.4 Gathering Crime by US State and US
Crime tables are downloaded and uploaded to GCP for easy access. 

### 4.1.4.1 Preparing Crime by US State and US

Step 1: Accessing Crime Tables excel file from GCP <br>
Step 2: Parsing it - skiping few top rows and bad lines <br>
Step 3: Processing the year as it is merged into one data <br>
Step 4: Changing the Data frame columns to sync with Pulitzer <br>
Step 5: Adding Audit fields. <br>
Step 6: Inserting data into Cassandra tables. <br>

In [103]:
blob = storage.Blob('table_1_crime_in_the_united_states_by_volume_and_rate_per_100000_inhabitants_1995-2014.xls',bucket)
with open('table_1_crime_in_the_united_states_by_volume_and_rate_per_100000_inhabitants_1995-2014.xls', 'wb') as file_obj:
    blob.download_to_file(file_obj)

In [104]:
filename='table_1_crime_in_the_united_states_by_volume_and_rate_per_100000_inhabitants_1995-2014.xls'
xl=pd.ExcelFile(filename)
df_crime_byvol=xl.parse('14tbl01',skiprows=3,header=0,skipfooter=10,error_bad_lines=False, engine='python', usecols=range(0,4))

In [106]:
df_crime_byvol['Year']=df_crime_byvol.Year.apply(lambda x: int(str(x)[:4]))

In [108]:
df_crime_byvol.columns=['year','population','violentcrime','violentcrimerate']

In [110]:
tz = timezone('EST') # adding time zone info
datetime.now(tz) 
df_crime_byvol['Entrydate'] = dt.datetime.now()

df_crime_byvol.insert(0,'Id',uuid.uuid4()) 
df_crime_byvol.Id= df_crime.Id.apply(lambda x: uuid.uuid4()) # adding unique identifier

In [111]:
df_crime_byvol

Unnamed: 0,Id,year,population,violentcrime,violentcrimerate,Entrydate
0,921006ea-e4ad-412f-b0c8-9a494feb08fb,1995,262803276,1798792,684.5,2017-10-23 00:27:37.706506
1,3cda340b-794f-49da-9b73-82fdd189c67d,1996,265228572,1688540,636.6,2017-10-23 00:27:37.706506
2,8d58a704-0aa5-46a8-9868-e581592bffc1,1997,267783607,1636096,611.0,2017-10-23 00:27:37.706506
3,bfac0bd3-8fc6-4cf0-b5bd-16a9a35117fa,1998,270248003,1533887,567.6,2017-10-23 00:27:37.706506
4,80be7628-bd41-44c6-87fb-310667d546cc,1999,272690813,1426044,523.0,2017-10-23 00:27:37.706506
5,69f9c891-aa66-40e3-81d7-b2cd728418be,2000,281421906,1425486,506.5,2017-10-23 00:27:37.706506
6,e1fd6370-291f-44e4-b742-5b34689a5d87,2001,285317559,1439480,504.5,2017-10-23 00:27:37.706506
7,c5030888-1574-49e9-baea-48410b12a52d,2002,287973924,1423677,494.4,2017-10-23 00:27:37.706506
8,edce9436-225c-4d11-99b9-ca8d85bbfcfc,2003,290788976,1383676,475.8,2017-10-23 00:27:37.706506
9,1b4b77c0-7642-4125-869a-d2e6b5ba52a9,2004,293656842,1360088,463.2,2017-10-23 00:27:37.706506


### 4.1.4.2 Inserting Crime by Volume into Cassandra

In [112]:
def _insertData(params):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = tuple_factory
    prepared=session.prepare("INSERT INTO capstone.crimebyvol(id,year,population, \
                              violentcrime,violentcrimerate,entrydate) \
                              VALUES (?,?,?,?,?,?)")
    
    #using datastax driver for multiprocessing 
    execute_concurrent_with_args(session, prepared, params, concurrency=50) 
    return None

def multiprocess(params):
    pool = Pool(processes=4)
    results = [pool.map(_insertData, (params[n:n+100],)) for n in range(0, len(params),100)]
    return results
    
if __name__ == "__main__":
    parameters=[]
    for index, row in enumerate(df_crime_byvol.values):        
        (a,b,c,d,e,f) = row
        row1=(a,str(b),int(c),int(d),int(e),f)
        parameters.append(row1)           
    a = multiprocess(parameters)

### 4.1.5 Gathering Population Data 2000 to 2016
Population data is collected over two censuses.

### 4.1.5.1 Preparing Population Data

Step 1: Reading population from 2000 - 2009 data files <br>
Step 2: Selecting the required fields <br>
Step 3: Filtering data to make it demographic agnostic <br> 
Step 4: Dropping not needed data fields <br>

In [14]:
import pandas as pd
filename='./Data/st-est00int-alldata.csv'
df_=pd.read_csv(filename,header=0,usecols=[2,3,4,5,6,7,9,10,11,12,13,14,15,16,17,18])

In [15]:
df_.columns

Index(['STATE', 'NAME', 'SEX', 'ORIGIN', 'RACE', 'AGEGRP', 'POPESTIMATE2000',
       'POPESTIMATE2001', 'POPESTIMATE2002', 'POPESTIMATE2003',
       'POPESTIMATE2004', 'POPESTIMATE2005', 'POPESTIMATE2006',
       'POPESTIMATE2007', 'POPESTIMATE2008', 'POPESTIMATE2009'],
      dtype='object')

This step is filtering the data frame to remove data related to demographics.

In [18]:
df_pop=df_[(df_.SEX==0) & (df_.ORIGIN==0) & (df_.RACE==0) & (df_.AGEGRP==0)]

Dropping not needed fields.

In [19]:
df_=df_pop.drop(['SEX', 'ORIGIN', 'RACE', 'AGEGRP'], axis=1)

In [20]:
df_.columns # Please make a note of the estimated figures for 2000 to 2009 based on state

Index(['STATE', 'NAME', 'POPESTIMATE2000', 'POPESTIMATE2001',
       'POPESTIMATE2002', 'POPESTIMATE2003', 'POPESTIMATE2004',
       'POPESTIMATE2005', 'POPESTIMATE2006', 'POPESTIMATE2007',
       'POPESTIMATE2008', 'POPESTIMATE2009'],
      dtype='object')

### Reading population data 2010 to 2016 

Step 1: Reading the 2000 census data 
Step 2: Selecting the fields matching the 2000 to 2009 population dataframe
Step 3: We have to merge the dataframe to get the consolidated population dataframe
Step 4: Drop the unwanted fields
Step 5: Adding Audit fields
Step 6: Inserting data into Cassandra tables.

In [16]:
filename='./Data/nst-est2016-alldata.csv'
df_1=pd.read_csv(filename,header=0,usecols=[1,3,4,7,8,9,10,11,12,13])

In [17]:
df_1.columns #Please make a note of the fields. 

Index(['REGION', 'STATE', 'NAME', 'POPESTIMATE2010', 'POPESTIMATE2011',
       'POPESTIMATE2012', 'POPESTIMATE2013', 'POPESTIMATE2014',
       'POPESTIMATE2015', 'POPESTIMATE2016'],
      dtype='object')

Merging the dataframes to create a unified data frame for population.

In [21]:
df_pop_merged=pd.merge(df_1,df_,how='left', on='NAME')

Checking the merged dataframe for correctness.

In [26]:
df_pop_merged.columns#sample(15,axis=1)#.info()

Index(['REGION', 'STATE_x', 'NAME', 'POPESTIMATE2010', 'POPESTIMATE2011',
       'POPESTIMATE2012', 'POPESTIMATE2013', 'POPESTIMATE2014',
       'POPESTIMATE2015', 'POPESTIMATE2016', 'STATE_y', 'POPESTIMATE2000',
       'POPESTIMATE2001', 'POPESTIMATE2002', 'POPESTIMATE2003',
       'POPESTIMATE2004', 'POPESTIMATE2005', 'POPESTIMATE2006',
       'POPESTIMATE2007', 'POPESTIMATE2008', 'POPESTIMATE2009'],
      dtype='object')

Dropping the merge columns

In [27]:
df_pop_merged_=df_pop_merged.drop(['STATE_x', 'STATE_y'], axis=1)

Adding audit fileds

In [31]:
tz = timezone('EST') # adding time zone info
datetime.now(tz) 
df_pop_merged_['Entrydate'] = dt.datetime.now()

df_pop_merged_.insert(0,'Id',uuid.uuid4()) 
df_pop_merged_.Id= df_pop_merged_.Id.apply(lambda x: uuid.uuid4()) # adding unique identifier

In [None]:
#df_pop_merged_.sample(3)#.columns

### 4.1.5.2 Inserting Population Data to Cassandra

In [10]:
import itertools
from multiprocessing import Pool
import sys
import time
from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import tuple_factory
from cassandra.auth import PlainTextAuthProvider

In [42]:
def _insertData(params):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = tuple_factory
    prepared=session.prepare("INSERT INTO capstone.population(id,region,state,   \
                              POPESTIMATE2000, POPESTIMATE2001, POPESTIMATE2002, \
                              POPESTIMATE2003, POPESTIMATE2004, POPESTIMATE2005, \
                              POPESTIMATE2006, POPESTIMATE2007, POPESTIMATE2008, \
                              POPESTIMATE2009, POPESTIMATE2010, POPESTIMATE2011, \
                              POPESTIMATE2012, POPESTIMATE2013, POPESTIMATE2014, \
                              POPESTIMATE2015, POPESTIMATE2016,entrydate)                  \
                              VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
    
    #using datastax driver for multiprocessing 
    execute_concurrent_with_args(session, prepared, params, concurrency=50) 
    return None

def multiprocess(params):
    pool = Pool(processes=4)
    results = [pool.map(_insertData, (params[n:n+100],)) for n in range(0, len(params),100)]
    return results
    
if __name__ == "__main__":
    parameters=[]
    for index, row in enumerate(df_pop_merged_.values):        
        (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u) = row
        row1=(a,str(b),c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u)
        parameters.append(row1)           
    a = multiprocess(parameters)
    

# 4.2 Reading data from Cassandra Tables - Phase - II
### The next stage of the project is to read data from Cassandra tables and logically group them for analysis

This section is not optimized wrt code for example we can writena function and call it for Cassandra data access.

In [19]:
import itertools
from multiprocessing import Pool
import sys
import time
from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import tuple_factory
from cassandra.auth import PlainTextAuthProvider


import pandas as pd
from google.cloud import storage
import datetime as dt
from datetime import datetime
from pytz import timezone
import uuid

In [20]:
df_con=pd.read_csv('~/connection_point.csv',header=0)  # This is done to add basic level of security
df_code=pd.read_csv('~/states_code.txt',header=0,sep='\t') # this is dict will serve state or two digit mapping lookup
states_dict=dict(zip(df_code.Abbreviation,df_code.State)) # creation of dict. This will be used later in the project.

In [21]:
from IPython.display import display
def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

def _fetchData(query_):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = pandas_factory    
    rows = session.execute(query_)
    return rows

if __name__ == "__main__":
    query_pram="SELECT  Newspaper,state,DailyCirculation_2004, \
                DailyCirculation_2013,ChangeDailyCirculation_2004_2013,\
                WinNFinalists_1990_2003,WinNFinalists_2004_2014,WinNFinalists_1990_2014\
                FROM capstone.pulitzer"
    rows = _fetchData(query_pram)
    df_pulitzer=rows._current_rows
    #print(display(df_pulitzer))
    
    #
    query_pram="SELECT  gdp,state,year FROM capstone.gdp"
    rows = _fetchData(query_pram)
    df_gdp=rows._current_rows
    #print(display(df_gdp))  

In [22]:
df_gdp.state=df_gdp.state.apply(lambda x: states_dict[x]) #Using the state dict to convert.

In [5]:
#df_pulitzer

### 4.2.1 Adding 2004 GDP Data to the Pulitzer
Pulitzer only have data related to 2004,2013,2014. So we have to filter all data related to 2004,2014 and 2014 from GDP and merge it to Pulitzer.

bfd n
Step 1: Adding 2004 Data to Pulitzer <br>
Step 2: Adding 2013 Data to Pulitzer <br>
Step 3: Adding 2014 Data to Pulitzer <br>


In [27]:
df_gdp_sorted=df_gdp[df_gdp.year=='2004'] #sorting the data frame to select
df_gdp_sorted.columns=['GDP_2004','state','year_2004'] #changing columns to sync with Pulitzer
df__=pd.merge(df_pulitzer,df_gdp_sorted.drop_duplicates(),how='left', on='state') 
# merging the data frames to add GDP data.

In [8]:
#df__.sample(3)

Adding 2013 GDP Data

In [29]:
df_gdp_sorted=df_gdp[df_gdp.year=='2013']
df_gdp_sorted.columns=['GDP_2013','state','year_2013']
df__=pd.merge(df__,df_gdp_sorted.drop_duplicates(),how='left', on='state')

In [7]:
#df__.sample(1)

Adding 2014 GDP Data

In [31]:
df_gdp_sorted=df_gdp[df_gdp.year=='2014']
df_gdp_sorted.columns=['GDP_2014','state','year_2014']
df__=pd.merge(df__,df_gdp_sorted.drop_duplicates(),how='left', on='state')

In [9]:
#df__

### 4.2.2 Adding Crime Data

In [33]:
from IPython.display import display
def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

def _fetchData(query_):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = pandas_factory    
    rows = session.execute(query_)
    return rows

if __name__ == "__main__":
    query_pram="SELECT  crimeindex,state FROM capstone.crimeindex"
    rows = _fetchData(query_pram)
    df_crimeindex=rows._current_rows
    #print(display(df_pulitzer))
    
    #
    query_pram="SELECT  violentcrime,year FROM capstone.crimebyvol"
    rows = _fetchData(query_pram)
    df_crimebyvol=rows._current_rows
    #print(display(df_gdp))  

In [34]:
#df_crimeindex
df__=pd.merge(df__,df_crimeindex,how='left', on='state')

In [12]:
#df__.sample(3)

Adding violentcrime data for 2004

In [36]:
df_crimebyvol_sorted=df_crimebyvol[df_crimebyvol.year=='2004']
#df_crimebyvol_sorted.insert(2,'state','US')
df_crimebyvol_sorted.columns=['violentcrime_2004','year_2004']
df__=pd.merge(df__,df_crimebyvol_sorted,how='left', on='year_2004')

In [10]:
#df__.sample(9)

Adding violent crime data for 2013 to Pulitzer

In [38]:
df_crimebyvol_sorted=df_crimebyvol[df_crimebyvol.year=='2013']
df_crimebyvol_sorted.columns=['violentcrime_2013','year_2013']
df__=pd.merge(df__,df_crimebyvol_sorted,how='left', on='year_2013')

In [11]:
#df__.sample(5)

Adding violent crime for 2014

In [40]:
df_crimebyvol_sorted=df_crimebyvol[df_crimebyvol.year=='2014']
df_crimebyvol_sorted.columns=['violentcrime_2014','year_2014']
df__=pd.merge(df__,df_crimebyvol_sorted,how='left', on='year_2014')

In [13]:
#df__#.columns

### 4.2.3 Adding Population Data

In [42]:
from IPython.display import display
def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

def _fetchData(query_):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = pandas_factory    
    rows = session.execute(query_)
    return rows

if __name__ == "__main__":
    query_pram="SELECT  * FROM capstone.population"
    rows = _fetchData(query_pram)
    df_popu=rows._current_rows
    #print(display(df_pulitzer))


In [43]:
df_popu_sorted=df_popu[['popestimate2004','popestimate2013','popestimate2014','state']]

In [44]:
df__=pd.merge(df__,df_popu_sorted,how='left', on='state')

In [14]:
#df__.sample(8)

In [46]:
df__.columns

Index(['newspaper', 'state', 'dailycirculation_2004', 'dailycirculation_2013',
       'changedailycirculation_2004_2013', 'winnfinalists_1990_2003',
       'winnfinalists_2004_2014', 'winnfinalists_1990_2014', 'GDP_2004',
       'year_2004', 'GDP_2013', 'year_2013', 'GDP_2014', 'year_2014',
       'crimeindex', 'violentcrime_2004', 'violentcrime_2013',
       'violentcrime_2014', 'popestimate2004', 'popestimate2013',
       'popestimate2014'],
      dtype='object')

In [15]:
#df__

In [48]:
df__=df__.drop(['year_2004', 'year_2013', 'year_2014'], axis=1)

Adding Audit Fields

In [49]:
tz = timezone('EST') # adding time zone info
datetime.now(tz) 
df__['Entrydate'] = dt.datetime.now()

df__.insert(0,'Id',uuid.uuid4()) 
df__.Id= df__.Id.apply(lambda x: uuid.uuid4()) # adding unique identifier

In [54]:
df__.to_csv('pulitzerFinal.csv') # This step is just for backup. We have to insert the merged data to Cassandra

Index(['Id', 'newspaper', 'state', 'dailycirculation_2004',
       'dailycirculation_2013', 'changedailycirculation_2004_2013',
       'winnfinalists_1990_2003', 'winnfinalists_2004_2014',
       'winnfinalists_1990_2014', 'GDP_2004', 'GDP_2013', 'GDP_2014',
       'crimeindex', 'violentcrime_2004', 'violentcrime_2013',
       'violentcrime_2014', 'popestimate2004', 'popestimate2013',
       'popestimate2014', 'Entrydate'],
      dtype='object')

### 4.2.4 Inserting Pulitzer Final Data into Cassandra

This step concludes our Data Gathering and Preparations. Now, we move to Data Discovery.

In [69]:
def _insertData(params):
    cluster = Cluster(contact_points=[df_con.ip[0]], auth_provider = \
                      PlainTextAuthProvider(username=df_con.user[0], \
                                            password=df_con.token[0]))
    session = cluster.connect()
    session.set_keyspace('capstone')
    session.row_factory = tuple_factory
    prepared=session.prepare("INSERT INTO capstone.pulitzerfinal(id,newspaper,state, \
                              dailycirculation_2004,dailycirculation_2013, \
                              changedailycirculation_2004_2013, winnfinalists_1990_2003, \
                              winnfinalists_2004_2014,winnfinalists_1990_2014,GDP_2004, \
                              GDP_2013, GDP_2014, crimeindex, violentcrime_2004,violentcrime_2013, \
                              violentcrime_2014, popestimate2004, popestimate2013, \
                              popestimate2014, entrydate) \
                              VALUES (?,?,?, \
                                      ?,?, \
                                      ?,?, \
                                      ?,?,?, \
                                      ?,?,?,?,?, \
                                      ?,?,?, \
                                      ?,?)")

    #using datastax driver for multiprocessing 
    execute_concurrent_with_args(session, prepared, params, concurrency=50) 
    return None

def multiprocess(params):
    pool = Pool(processes=4)
    results = [pool.map(_insertData, (params[n:n+10],)) for n in range(0, len(params),10)]
    return results
    

if __name__ == "__main__":
    parameters=[]
    for index, row in enumerate(df__.values):        
        (a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,a11,a12,a13,a14,a15,a16,a17,a18,a19,a20) = row
        row1=(a1,str(a2),str(a3),str(a4),str(a5),str(a6),str(a7),str(a8),str(a9), \
              str(a10),str(a11),str(a12),str(a13),str(a14),str(a15),str(a16),str(a17),str(a18),str(a19),a20)
        parameters.append(row1)           
    a = multiprocess(parameters)  