In [1]:
import numpy as np
import pandas as pd
import os
import warnings
from pathlib import Path
warnings.filterwarnings('ignore')


In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, IntegerType, StructType
import time

#create a session
spark = SparkSession.builder\
    .appName('SparkSQL')\
    .config('spark.sql.debug.maxToStringFields', 2000)\
    .config('spark.driver.memory', '2g')\
    .getOrCreate()

from pyspark import SparkFiles

# filePath = Path(r'/Users/flynnlives/Documents/GitHub/Project_4_Group_2/resources/spending_2005-2022.csv')
# spark.sparkContext.addFile(filePath)

cols = ['Chicago', 'Detroit', 'Minneapolis-St.Paul','Cleveland', 'St. Louis', 'New York',
       'Philadelphia', 'Boston','Pittsburgh', 'Washington, D.C.', 'Baltimore', 'Atlanta',
       'Miami', 'Dallas-Fort Worth', 'Houston', 'Los Angeles',
       'San Francisco', 'San Diego','Portland', 'Seattle', 'Honolulu', 'Anchorage', 'Phoenix', 'Denver','Year', 'Tampa']

schema = [StructField('Item', StringType(), True)]
for i in cols:
    schema.append(StructField(i, IntegerType(), True))
schema


final = StructType(fields=schema)
final

consumerSpendDF = spark.read.csv('resources/spending_2005-2022.csv', sep=',', header=True, ignoreLeadingWhiteSpace=True, schema = final)
consumerSpendDF.show()

## Alternate way using inferschema

# consumerSpendDF = spark.read.csv('resources/spending_2005-2022.csv', sep=',', header=True, ignoreLeadingWhiteSpace=True, inferSchema = True)
# consumerSpendDF.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/11 19:28:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/11 19:28:16 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Item, Chicago, Detroit, Minneapolis-St.Paul, Cleveland, St. Louis, New York, Philadelphia, Boston, Pittsburgh, Washington, D.C., Baltimore, Atlanta, Miami, Dallas-Fort Worth, Houston, Los Angeles, San Francisco, San Diego, Portland, Seattle, Honolulu, Anchorage, Phoenix, Denver, Tampa, Year
 Schema: Item, Chicago, Detroit, Minneapolis-St.Paul, Cleveland, St. Louis, New York, Philadelphia, Boston, Pittsburgh, Washington, D.C., Baltimore, Atlanta, Miami, Dallas-Fort Worth, Houston, Los Angeles, San Francisco, San Diego, Portland, Seattle, Honolulu, Anchorage, Phoenix, Denver, Year, Tampa
Expected: Year but found: Tampa
CSV file:

+--------------------+-------+-------+-------------------+---------+---------+--------+------------+------+----------+----------------+---------+-------+-----+-----------------+-------+-----------+-------------+---------+--------+-------+--------+---------+-------+------+----+-----+
|                Item|Chicago|Detroit|Minneapolis-St.Paul|Cleveland|St. Louis|New York|Philadelphia|Boston|Pittsburgh|Washington, D.C.|Baltimore|Atlanta|Miami|Dallas-Fort Worth|Houston|Los Angeles|San Francisco|San Diego|Portland|Seattle|Honolulu|Anchorage|Phoenix|Denver|Year|Tampa|
+--------------------+-------+-------+-------------------+---------+---------+--------+------------+------+----------+----------------+---------+-------+-----+-----------------+-------+-----------+-------------+---------+--------+-------+--------+---------+-------+------+----+-----+
|Meats, poultry, f...|    815|   1047|                713|      726|      854|    1109|         963|   998|       681|             766|      737|   

23/12/11 19:28:24 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
consumerSpendDF.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Chicago: integer (nullable = true)
 |-- Detroit: integer (nullable = true)
 |-- Minneapolis-St.Paul: integer (nullable = true)
 |-- Cleveland: integer (nullable = true)
 |-- St. Louis: integer (nullable = true)
 |-- New York: integer (nullable = true)
 |-- Philadelphia: integer (nullable = true)
 |-- Boston: integer (nullable = true)
 |-- Pittsburgh: integer (nullable = true)
 |-- Washington, D.C.: integer (nullable = true)
 |-- Baltimore: integer (nullable = true)
 |-- Atlanta: integer (nullable = true)
 |-- Miami: integer (nullable = true)
 |-- Dallas-Fort Worth: integer (nullable = true)
 |-- Houston: integer (nullable = true)
 |-- Los Angeles: integer (nullable = true)
 |-- San Francisco: integer (nullable = true)
 |-- San Diego: integer (nullable = true)
 |-- Portland: integer (nullable = true)
 |-- Seattle: integer (nullable = true)
 |-- Honolulu: integer (nullable = true)
 |-- Anchorage: integer (nullable = true)
 |-- Phoenix: intege

In [4]:
consumerSpendDF.createOrReplaceTempView('consumerSpend')


In [17]:
start_time = time.time()

spendYear = spark.sql("""
select 
    year
from consumerSpend
where `Los Angeles` >=10000
order by year desc
""").show()
### need to use ` (backtick) for spaces

print("--- %s seconds ---" % (time.time() - start_time))

+----+
|year|
+----+
|2022|
|2022|
|2021|
|2021|
|2020|
|2020|
|2019|
|2019|
|2018|
|2017|
|2016|
|2016|
|2015|
|2014|
|2013|
|2012|
|2011|
|2010|
|2009|
|2008|
+----+
only showing top 20 rows

--- 0.11617207527160645 seconds ---


In [3]:
os.listdir('spendings')

['cu-msa-south-2-year-average-2013.xlsx',
 'cu-msa-midwest-2-year-average-2019.xlsx',
 'cu-msa-south-2-year-average-2005.xlsx',
 'cu-msa-northeast-2-year-average-2021.xlsx',
 'cu-msa-west-2-year-average-2016.xlsx',
 'cu-msa-midwest-2-year-average-2015.xlsx',
 'cu-msa-west-2-year-average-2020.xlsx',
 'cu-msa-south-2-year-average-2009.xlsx',
 'cu-msa-northeast-2-year-average-2017.xlsx',
 'cu-msa-northeast-2-year-average-2016.xlsx',
 'cu-msa-west-2-year-average-2021.xlsx',
 'cu-msa-south-2-year-average-2008.xlsx',
 'cu-msa-midwest-2-year-average-2014.xlsx',
 'cu-msa-west-2-year-average-2017.xlsx',
 'cu-msa-northeast-2-year-average-2020.xlsx',
 'cu-msa-midwest-2-year-average-2022.xlsx',
 'cu-msa-south-2-year-average-2012.xlsx',
 'cu-msa-midwest-2-year-average-2018.xlsx',
 'cu-msa-northeast-2-year-average-2007.xlsx',
 'cu-msa-midwest-2-year-average-2013.xlsx',
 'cu-msa-south-2-year-average-2019.xlsx',
 'cu-msa-midwest-2-year-average-2005.xlsx',
 'cu-msa-northeast-2-year-average-2011.xlsx',


In [4]:
startYear = 2005
endYear = 2022
regions = ['midwest','northeast','south','west']
years = np.arange(startYear,endYear+1)
catLst = ['Meats, poultry, fish, and eggs',
'Cereals and bakery products',
'Dairy products',
'Fruits and vegetables',
'Other food at home',
'Food away from home',
'Alcoholic beverages',
'Housing',
'Apparel and services',
'Transportation',
'Healthcare',
'Entertainment',
'Personal care products and services',
'Reading',
'Education',
'Tobacco products and smoking supplies',
'Miscellaneous',
'Cash contributions',
'Personal insurance and pensions']
### city names to replace
cityNames = {'Minneapolis-\nSt. Paul':'Minneapolis-St.Paul',
       'Washington,\nD.C.':'Washington, D.C.','Washington\nD.C.':'Washington, D.C.',
       'Dallas- Fort Worth':'Dallas-Fort Worth','Dallas-\nFort Worth':'Dallas-Fort Worth',
       'Los\nAngeles':'Los Angeles', 'San\nFrancisco':'San Francisco'}
### category names to replace
catNames = {'Health care':'Healthcare'}

In [5]:
final = pd.DataFrame()
for y in years:
    first = True
    for r in regions:
        temp = pd.read_excel(f'spendings/cu-msa-{r}-2-year-average-{y}.xlsx').dropna().reset_index(drop=True)
        temp.columns = temp.iloc[0]
        temp['Item'] = [i.strip() for i in temp['Item']]
        temp.set_index('Item',inplace=True)
        temp.rename(cityNames,axis='columns',errors='ignore',inplace=True)
        temp.rename(index=catNames,errors='ignore',inplace=True)  
        temp = temp.loc[catLst]     
        ### df is done
        if first:
            master = temp
            first = False
        else:
            master = pd.merge(master,temp,left_index=True,right_index=True,how ='outer')
    master['Year'] = np.full(master.shape[0],y)
    cols = [i for i in master.columns if 'All ' not in i]
    master = master[cols]
    ## save individual dfs
    master.to_csv(f'resources/spending_{y}.csv')
    final = pd.concat([final,master],axis = 0)
final
final.to_csv(f'resources/spending_{startYear}-{endYear}.csv')
        


In [6]:
master

Unnamed: 0_level_0,Chicago,Detroit,Minneapolis-St.Paul,St. Louis,New York,Philadelphia,Boston,"Washington, D.C.",Baltimore,Atlanta,...,Tampa,Los Angeles,San Francisco,San Diego,Seattle,Phoenix,Denver,Honolulu,Anchorage,Year
Item,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
"Meats, poultry, fish, and eggs",1269,1184,1100,1170,1534,1269,1198,1391,1398,1260,...,1019,1311,1789,1210,1190,982,1239,1731,1576,2022
Cereals and bakery products,719,695,680,702,818,809,911,805,790,714,...,650,713,1053,720,705,604,605,1072,1145,2022
Dairy products,492,510,571,534,664,558,577,613,554,477,...,522,502,740,563,620,466,530,590,725,2022
Fruits and vegetables,1115,1055,1241,1045,1460,1237,1414,1261,1329,1204,...,906,1217,1958,1416,1368,870,1103,1703,1931,2022
Other food at home,2035,2051,2317,2252,2163,2252,2461,2488,2102,2096,...,1768,1937,2815,2334,2678,1892,1861,2699,3183,2022
Food away from home,3741,2968,3507,3817,4004,4366,4312,4768,4488,3322,...,3156,4495,4781,4205,3934,3517,4538,5146,3611,2022
Alcoholic beverages,907,655,1098,789,788,871,1100,929,692,600,...,799,720,998,900,1269,815,898,559,831,2022
Housing,27212,21746,27591,22255,31213,28453,31679,33285,26047,24914,...,23686,28300,41367,31853,33090,24843,30893,27799,25141,2022
Apparel and services,2534,1744,1823,1772,2699,3597,1807,1965,2296,2442,...,1516,2126,2199,2549,2398,1834,1781,1411,2083,2022
Transportation,9569,12566,10018,12262,10326,11247,11697,12162,14457,12354,...,11940,13112,12991,12619,14481,11736,12994,10103,13745,2022


In [7]:
master.columns

Index(['Chicago', 'Detroit', 'Minneapolis-St.Paul', 'St. Louis', 'New York',
       'Philadelphia', 'Boston', 'Washington, D.C.', 'Baltimore', 'Atlanta',
       'Miami', 'Dallas-Fort Worth', 'Houston', 'Tampa', 'Los Angeles',
       'San Francisco', 'San Diego', 'Seattle', 'Phoenix', 'Denver',
       'Honolulu', 'Anchorage', 'Year'],
      dtype='object', name=0)