# Credits

Datasets used in this project:

Kaggle - USstates Dataset - analysis with pandas - Giovanna de Vincenzo
simplemaps - United States Cities Database (Free version) - Creative Commons Attribution 4.0
data.world - US Economic Census Core Data by Industry, 2012 and 2007 - BY GARY HOOVER
data.world - 2012 US Industry Data - BY GARY HOOVER
data.world - INC 5000 - 2019 - BY AURIELLE PERLMANN
data.world - Free 7+ Million Company Dataset - IN PEOPLE DATA LABS

In [1]:
import configparser
import findspark 
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark import SparkContext
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import StructType, StructField, StringType, FloatType

In [2]:
spark = SparkSession.builder.appName('localbusiness').getOrCreate()

# Loading cities / countries (dimensions)

In [3]:
uscities = spark.read.csv('./data/uscities.csv',header=True)
uscities.head()

Row(city='South Creek', city_ascii='South Creek', state_id='WA', state_name='Washington', county_fips='53053', county_name='Pierce', county_fips_all='53053', county_name_all='Pierce', lat='46.9994', lng='-122.3921', population='2500', density='125', source='polygon', military='FALSE', incorporated='TRUE', timezone='America/Los_Angeles', ranking='3', zips='98580 98387 98338', id='1840116412')

In [4]:
stateabbrevs = spark.read.csv('./data/state-abbrevs.csv',header=True)
stateabbrevs.head()

Row(state='Alabama', abbreviation='AL')

In [5]:
statepopulation = spark.read.csv('./data/state-population.csv',header=True)
statepopulation.head(5)

[Row(state/region='AL', ages='under18', year='2012', population='1117489'),
 Row(state/region='AL', ages='total', year='2012', population='4817528'),
 Row(state/region='AL', ages='under18', year='2010', population='1130966'),
 Row(state/region='AL', ages='total', year='2010', population='4785570'),
 Row(state/region='AL', ages='under18', year='2011', population='1125763')]

# Loading companies (dimension)

In [6]:
companies_sorted = spark.read.csv('./data/companies_sorted.csv',header=True)
companies_sorted.head(2)

[Row(_c0='5872184', name='ibm', domain='ibm.com', year founded='1911', industry='information technology and services', size range='10001+', locality='new york, new york, united states', country='united states', linkedin url='linkedin.com/company/ibm', current employee estimate='274047', total employee estimate='716906'),
 Row(_c0='4425416', name='tata consultancy services', domain='tcs.com', year founded='1968', industry='information technology and services', size range='10001+', locality='bombay, maharashtra, india', country='india', linkedin url='linkedin.com/company/tata-consultancy-services', current employee estimate='190771', total employee estimate='341369')]

# Loading Eco metrics (fact) + Sectors (dimension)

In [7]:
ecocensusbyindustry = spark.read.option('delimiter', ';').csv('./data/ecocensusbyindustry.csv',header=True)
ecocensusbyindustry.head()

Row(Meaning of 2007 NAICS code='Mining, quarrying, and oil and gas extraction', 2007 NAICS code='21', Year='2012', Number of establishments='25,417', Value of sales, shipments, receipts, revenue, or business done ($1,000)='529,239,818',  Annual payroll ($1,000) =' 59,461,950 ', Number of paid employees for period including March 12='848,189')

In [8]:
industrydatabystate = spark.read.option('delimiter', ';').csv('./data/industrydatabystate.csv',header=True)
industrydatabystate.head()

Row(Geographic area name='United States', 2012 NAICS code='21', Meaning of 2012 NAICS code='Mining, quarrying, and oil and gas extraction', Meaning of Type of operation or tax status code='Total', Number of establishments='28,643',  Value of sales, shipments, receipts, revenue, or business done ($1,000) =' 555,174,196 ', Annual payroll ($1,000)='61,331,381', First-quarter payroll ($1,000)='N', Number of employees='903,641', Number of nonemployer establishments='109,931',  Nonemployer value of sales, shipments, receipts, revenue, or business done ($1,000) =' 7,820,264 ')

In [9]:
ecoinc5000 = spark.read.csv('./data/ecoinc5000-2019.csv',header=True)
ecoinc5000.head()

Row(rank='1', profile='https://www.inc.com/profile/freestar', name='Freestar', url='http://freestar.com', state='AZ', revenue='36.9 Million', growth='36680.3882', industry='Advertising & Marketing', workers='40', founded='2015', yrs_on_list='1', previous_workers='5', metro='Phoenix', city='Phoenix')

# Creating spark SQL tables

## Dimension tables

In [10]:
# SECTOR dimension table
sector_table1 = ecocensusbyindustry.select(col("2007 NAICS code").alias('naics'),
                                          col('Meaning of 2007 NAICS code').alias('name'))
sector_table2 = industrydatabystate.select(col("2012 NAICS code").alias('naics'),
                                         col('Meaning of 2012 NAICS code').alias('name'))
sector_table = sector_table1.union(sector_table2).distinct()
sector_table.head(2)
sector_table.write.mode('overwrite').save('output/sector_table',  format='parquet')

In [11]:
# STATE dimension table
state_table1 = statepopulation.select(col('population'), col('state/region').alias('id'))
state_table2 = stateabbrevs.select('state', 'abbreviation')
state_table = state_table1.join(state_table2, state_table1.id==state_table2.abbreviation).select(col('id'), col('state').alias('name'), col('population')).distinct()
state_table.head(2)
state_table.write.mode('overwrite').save('output/state_table',  format='parquet')

In [12]:
# CITY dimension table
city_table = uscities.select(col('city').alias('name'), 'state_id', 'population').distinct().withColumn("idx",monotonically_increasing_id())
city_table.head(2)
city_table.write.mode('overwrite').save('output/city_table',  format='parquet')

In [13]:
# COMPANY dimension table
company_table = companies_sorted.select('name', 'domain', col('year founded').alias('year'))
company_table.head()
company_table.write.mode('overwrite').save('output/company_table',  format='parquet')

## Fact table - Economic metrics

In [14]:
# ECONOMICS fact table

eco_table1 = ecocensusbyindustry.select(
    col('Year').alias('year'),
    col('Number of establishments').alias('nb_of_establishments'),
    col('Value of sales, shipments, receipts, revenue, or business done ($1,000)').alias('revenue'),
    col('2007 NAICS code').alias('naics'),
    col('Number of paid employees for period including March 12').alias('employees'),
    lit('US').alias('state_id'),
    lit(None).alias('growth'),
    lit(None).alias('city_id'))

eco_table2 = industrydatabystate.select(
    lit(2012).alias('year'),
    col('Number of establishments').alias('nb_of_establishments'),
    col('Number of employees').alias('employees'),
    col(' Nonemployer value of sales, shipments, receipts, revenue, or business done ($1,000) ').alias('revenue'),
    col('2012 NAICS code').alias('naics'),
    col('Geographic area name').alias('state')).distinct()
eco_table2 = eco_table2.join(
    state_table, 
    state_table.id==eco_table2.state).select(
        'year',
        'nb_of_establishments',
        'employees',
        'revenue',
        'naics',
        col('id').alias('state_id'),
        lit(None).alias('growth'),
        lit(None).alias('city_id')).distinct()

eco_table3 = ecoinc5000.join(
    city_table, 
    city_table.name==ecoinc5000.city
    ).join(
    state_table,
    state_table.id==ecoinc5000.state
    ).join(
    sector_table,
    sector_table.name==ecoinc5000.industry).select(
    lit(2019).alias('year'),
    col('workers').alias('employees'),
    lit(1).alias('nb_of_establishments'),
    'revenue',
    'growth',
    col('id').alias('state_id'),
    col('naics'),
    col('idx').alias('city_id')).distinct()
eco_table = eco_table1.union(eco_table2).union(eco_table3)
eco_table.head(4)
eco_table.write.mode('overwrite').save('output/eco_table',  format='parquet')

# Example of requests (used in App)

In [15]:
sector_table5 = spark.read.parquet('./output/sector_table')
for index, sectorrow in sector_table5.limit(5).collect():
    print(index)

21
238150
238910
311330
316993


In [16]:
for staterow in state_table.limit(20).collect():
    print(staterow)

Row(id='CT', name='Connecticut', population='3484336')
Row(id='ID', name='Idaho', population='1612136')
Row(id='ID', name='Idaho', population='426076')
Row(id='LA', name='Louisiana', population='1108728')
Row(id='MD', name='Maryland', population='5928814')
Row(id='MN', name='Minnesota', population='1280557')
Row(id='MS', name='Mississippi', population='768418')
Row(id='NH', name='New Hampshire', population='1109929')
Row(id='NH', name='New Hampshire', population='307292')
Row(id='NJ', name='New Jersey', population='8836639')
Row(id='OR', name='Oregon', population='793435')
Row(id='RI', name='Rhode Island', population='1015960')
Row(id='SD', name='South Dakota', population='203145')
Row(id='UT', name='Utah', population='873019')
Row(id='WA', name='Washington', population='1517527')
Row(id='CA', name='California', population='32987675')
Row(id='FL', name='Florida', population='15759421')
Row(id='GA', name='Georgia', population='7328413')
Row(id='MA', name='Massachusetts', population='617

In [28]:
eco_tableX = spark.read.parquet('./output/eco_table')
eco = eco_tableX.where(eco_tableX.naics == 21).where(eco_tableX.state_id == 'US').first()
print('''text'''+eco['employees']+'''text''')

text848,189text
