In [1]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import split, col, create_map, lit

from itertools import chain

In [2]:
credentials_location = '/Users/alvin/.google/credentials/google_credentials.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('de-project-spark-batch') \
    .set("spark.jars", "/opt/homebrew/Cellar/apache-spark/3.2.1/jars/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

22/05/01 19:28:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
df_emp = spark.read \
    .option("header", "true") \
    .parquet('gs://de-project-data-lake_intense-glow-347320/raw/*')

                                                                                

In [6]:
df_emp.show()

[Stage 1:>                                                          (0 + 1) / 1]

+---------------+----------+-----------+--------------------+----+-----------+--------+----------+-----------------+-----+-----------------+
|      LAUS_code|state_fips|county_fips|        county_state|year|labor_force|employed|unemployed|unemployment_rate| fips|__index_level_0__|
+---------------+----------+-----------+--------------------+----+-----------+--------+----------+-----------------+-----+-----------------+
|CN0100100000000|        01|        001|  Autauga County, AL|2009|      24660|   22464|      2196|              8.9|01001|                5|
|CN0100300000000|        01|        003|  Baldwin County, AL|2009|      82314|   74950|      7364|              8.9|01003|                6|
|CN0100500000000|        01|        005|  Barbour County, AL|2009|       9944|    8635|      1309|             13.2|01005|                7|
|CN0100700000000|        01|        007|     Bibb County, AL|2009|       8696|    7637|      1059|             12.2|01007|                8|
|CN0100900000

                                                                                

In [7]:
df_emp = df_emp.withColumn('state', split(df_emp['county_state'], ', ').getItem(1)) \
    .withColumn('county', split(df_emp['county_state'], ', ').getItem(0))

In [8]:
states = {
    'AK': 'Alaska',
    'AL': 'Alabama',
    'AR': 'Arkansas',
    'AZ': 'Arizona',
    'CA': 'California',
    'CO': 'Colorado',
    'CT': 'Connecticut',
    'DC': 'District of Columbia',
    'DE': 'Delaware',
    'FL': 'Florida',
    'GA': 'Georgia',
    'HI': 'Hawaii',
    'IA': 'Iowa',
    'ID': 'Idaho',
    'IL': 'Illinois',
    'IN': 'Indiana',
    'KS': 'Kansas',
    'KY': 'Kentucky',
    'LA': 'Louisiana',
    'MA': 'Massachusetts',
    'MD': 'Maryland',
    'ME': 'Maine',
    'MI': 'Michigan',
    'MN': 'Minnesota',
    'MO': 'Missouri',
    'MS': 'Mississippi',
    'MT': 'Montana',
    'NC': 'North Carolina',
    'ND': 'North Dakota',
    'NE': 'Nebraska',
    'NH': 'New Hampshire',
    'NJ': 'New Jersey',
    'NM': 'New Mexico',
    'NV': 'Nevada',
    'NY': 'New York',
    'OH': 'Ohio',
    'OK': 'Oklahoma',
    'OR': 'Oregon',
    'PA': 'Pennsylvania',
    'RI': 'Rhode Island',
    'SC': 'South Carolina',
    'SD': 'South Dakota',
    'TN': 'Tennessee',
    'TX': 'Texas',
    'UT': 'Utah',
    'VA': 'Virginia',
    'VT': 'Vermont',
    'WA': 'Washington',
    'WI': 'Wisconsin',
    'WV': 'West Virginia',
    'WY': 'Wyoming'
}

In [9]:
mapping_expr = create_map([lit(x) for x in chain(*states.items())])
df_emp = df_emp.withColumn('state_full', mapping_expr[df_emp.state])

In [10]:
df_emp = df_emp.select("year", "LAUS_code", "fips", "state_fips", "county_fips", "state", "state_full", "county", "labor_force", "employed", "unemployed", "unemployment_rate")

In [17]:
df_emp.printSchema()

root
 |-- year: long (nullable = true)
 |-- LAUS_code: string (nullable = true)
 |-- fips: string (nullable = true)
 |-- state_fips: string (nullable = true)
 |-- county_fips: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_full: string (nullable = true)
 |-- county: string (nullable = true)
 |-- labor_force: long (nullable = true)
 |-- employed: long (nullable = true)
 |-- unemployed: long (nullable = true)
 |-- unemployment_rate: double (nullable = true)



In [25]:
df_emp.withColumn("year", df_emp["year"].cast(types.IntegerType))

TypeError: unexpected type: <class 'pyspark.sql.types.DataTypeSingleton'>

In [13]:
df_emp.show()

[Stage 4:>                                                          (0 + 1) / 1]

+----------------+---------------+-----+----------+-----------+-----+----------+----------------+-----------+--------+----------+-----------------+
|            year|      LAUS_code| fips|state_fips|county_fips|state|state_full|          county|labor_force|employed|unemployed|unemployment_rate|
+----------------+---------------+-----+----------+-----------+-----+----------+----------------+-----------+--------+----------+-----------------+
|1230768000000000|CN0100100000000|01001|        01|        001|   AL|   Alabama|  Autauga County|      24660|   22464|      2196|              8.9|
|1230768000000000|CN0100300000000|01003|        01|        003|   AL|   Alabama|  Baldwin County|      82314|   74950|      7364|              8.9|
|1230768000000000|CN0100500000000|01005|        01|        005|   AL|   Alabama|  Barbour County|       9944|    8635|      1309|             13.2|
|1230768000000000|CN0100700000000|01007|        01|        007|   AL|   Alabama|     Bibb County|       8696|   

                                                                                

In [26]:
df_emp.write.csv('/Users/alvin/Documents/data/csv/', mode='overwrite', header=True)

                                                                                