In [1]:
# (1) Download databricks.spark.xml library 
# (2) https://repo1.maven.org/maven2/com/databricks/spark-xml_2.12/0.12.0/spark-xml_2.12-0.12.0.jar
# (3) Upload jar file to HDFS root
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
warehouse_location = 'hdfs://hdfs-nn:9000/warehouse'
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.jars", "hdfs://hdfs-nn:9000/spark-xml_2.12-0.12.0.jar") \
    .enableHiveSupport() \
    .getOrCreate()

In [2]:
# Read .xml datasets "politics.xml"
hdfs_path = "hdfs://hdfs-nn:9000/project/Bronze/Politics/politics.xml"

df = spark.read \
    .format("com.databricks.spark.xml") \
    .option("rootTag", "PRESIDENTIAL") \
    .option("rowTag", "RESULT") \
    .load(hdfs_path)

In [3]:
# (1) Lowercase "state" column and apply initcap() to have first letter of column in uppercase
# (2) Rename column "party_simplified" to "party"
# (3) Lowercase "party" column and apply initcap() to have first letter of column in uppercase
df = df.withColumn("state", initcap(lower(col("state")))) \
       .withColumnRenamed("party_simplified", "party") \
       .withColumn("party", initcap(lower(col("party"))))

In [4]:
# Drop unnecessary columns
df = df.select([c for c in df.columns if c not in {'candidate','notes', 'office', 'party_detailed', 'state_cen', 'state_ic', 'state_po', 'version', 'writein'}])

In [5]:
# Groupby data and sum column "candidatevotes" for duplicated rows 
df = df.groupBy("state_fips","year", "state", "party", "totalvotes") \
    .sum("candidatevotes") 

In [6]:
# (1) Ascending order of data by "state_fips" since it'll be numbers, "year", "state" and "party" 
# (2) Rename column "sum(candidatevotes)" to "votes"
# (3) Drop column "state_fips" since it was just needed for ordering the data
cols = ["year", "state_fips", "party", "candidatevotes"]
df = df.withColumnRenamed("sum(candidatevotes)", "candidatevotes") \
       .orderBy(*cols, ascending=True) \
       .drop("state_fips")

In [7]:
df = df.withColumn("year",col("year").cast("int")) \
       .withColumn("candidatevotes",col("candidatevotes").cast("int")) \
       .withColumn("totalvotes",col("totalvotes").cast("int"))

In [8]:
# Reorder data
df = df.select("year","state","party","candidatevotes", "totalvotes")

In [9]:
df.show()

+----+----------+-----------+--------------+----------+
|year|     state|      party|candidatevotes|totalvotes|
+----+----------+-----------+--------------+----------+
|1976|   Alabama|   Democrat|        659170|   1182850|
|1976|   Alabama|Libertarian|          1481|   1182850|
|1976|   Alabama|      Other|         18129|   1182850|
|1976|   Alabama| Republican|        504070|   1182850|
|1976|    Alaska|   Democrat|         44058|    123574|
|1976|    Alaska|Libertarian|          6785|    123574|
|1976|    Alaska|      Other|          1176|    123574|
|1976|    Alaska| Republican|         71555|    123574|
|1976|   Arizona|   Democrat|        295602|    742719|
|1976|   Arizona|Libertarian|          7647|    742719|
|1976|   Arizona|      Other|         20828|    742719|
|1976|   Arizona| Republican|        418642|    742719|
|1976|  Arkansas|   Democrat|        498604|    767535|
|1976|  Arkansas|      Other|          1028|    767535|
|1976|  Arkansas| Republican|        267903|    

In [10]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- party: string (nullable = true)
 |-- candidatevotes: integer (nullable = true)
 |-- totalvotes: integer (nullable = true)



In [11]:
df = df.withColumn("year",col("year").cast("string"))
df.show()

+----+----------+-----------+--------------+----------+
|year|     state|      party|candidatevotes|totalvotes|
+----+----------+-----------+--------------+----------+
|1976|   Alabama|   Democrat|        659170|   1182850|
|1976|   Alabama|Libertarian|          1481|   1182850|
|1976|   Alabama|      Other|         18129|   1182850|
|1976|   Alabama| Republican|        504070|   1182850|
|1976|    Alaska|   Democrat|         44058|    123574|
|1976|    Alaska|Libertarian|          6785|    123574|
|1976|    Alaska|      Other|          1176|    123574|
|1976|    Alaska| Republican|         71555|    123574|
|1976|   Arizona|   Democrat|        295602|    742719|
|1976|   Arizona|Libertarian|          7647|    742719|
|1976|   Arizona|      Other|         20828|    742719|
|1976|   Arizona| Republican|        418642|    742719|
|1976|  Arkansas|   Democrat|        498604|    767535|
|1976|  Arkansas|      Other|          1028|    767535|
|1976|  Arkansas| Republican|        267903|    

In [12]:
from pyspark.sql.types import *
df = df.withColumn("year", df["year"].cast(DateType()))

df.printSchema()
df.show()

root
 |-- year: date (nullable = true)
 |-- state: string (nullable = true)
 |-- party: string (nullable = true)
 |-- candidatevotes: integer (nullable = true)
 |-- totalvotes: integer (nullable = true)

+----------+----------+-----------+--------------+----------+
|      year|     state|      party|candidatevotes|totalvotes|
+----------+----------+-----------+--------------+----------+
|1976-01-01|   Alabama|   Democrat|        659170|   1182850|
|1976-01-01|   Alabama|Libertarian|          1481|   1182850|
|1976-01-01|   Alabama|      Other|         18129|   1182850|
|1976-01-01|   Alabama| Republican|        504070|   1182850|
|1976-01-01|    Alaska|   Democrat|         44058|    123574|
|1976-01-01|    Alaska|Libertarian|          6785|    123574|
|1976-01-01|    Alaska|      Other|          1176|    123574|
|1976-01-01|    Alaska| Republican|         71555|    123574|
|1976-01-01|   Arizona|   Democrat|        295602|    742719|
|1976-01-01|   Arizona|Libertarian|          7647|  

In [13]:
us_state_abbrev = {
    'Alabama': 'AL',
    'Alaska': 'AK',
    'Arizona': 'AZ',
    'Arkansas': 'AR',
    'California': 'CA',
    'Colorado': 'CO',
    'Connecticut': 'CT',
    'Delaware': 'DE',
    'District Of Columbia': 'DC',
    'Florida': 'FL',
    'Georgia': 'GA',
    'Hawaii': 'HI',
    'Idaho': 'ID',
    'Illinois': 'IL',
    'Indiana': 'IN',
    'Iowa': 'IA',
    'Kansas': 'KS',
    'Kentucky': 'KY',
    'Louisiana': 'LA',
    'Maine': 'ME',
    'Maryland': 'MD',
    'Massachusetts': 'MA',
    'Michigan': 'MI',
    'Minnesota': 'MN',
    'Mississippi': 'MS',
    'Missouri': 'MO',
    'Montana': 'MT',
    'Nebraska': 'NE',
    'Nevada': 'NV',
    'New Hampshire': 'NH',
    'New Jersey': 'NJ',
    'New Mexico': 'NM',
    'New York': 'NY',
    'North Carolina': 'NC',
    'North Dakota': 'ND',
    'Ohio': 'OH',
    'Oklahoma': 'OK',
    'Oregon': 'OR',
    'Pennsylvania': 'PA',
    'Rhode Island': 'RI',
    'South Carolina': 'SC',
    'South Dakota': 'SD',
    'Tennessee': 'TN',
    'Texas': 'TX',
    'Utah': 'UT',
    'Vermont': 'VT',
    'Virginia': 'VA',
    'Washington': 'WA',
    'West Virginia': 'WV',
    'Wisconsin': 'WI',
    'Wyoming': 'WY'
}


abbrev_us_state = dict(map(reversed, us_state_abbrev.items()))

df = df.replace(to_replace=us_state_abbrev, subset=['state'])

df.toPandas()

Unnamed: 0,year,state,party,candidatevotes,totalvotes
0,1976-01-01,AL,Democrat,659170,1182850
1,1976-01-01,AL,Libertarian,1481,1182850
2,1976-01-01,AL,Other,18129,1182850
3,1976-01-01,AL,Republican,504070,1182850
4,1976-01-01,AK,Democrat,44058,123574
...,...,...,...,...,...
2349,2020-01-01,WI,Republican,1610184,3298041
2350,2020-01-01,WY,Democrat,73491,278503
2351,2020-01-01,WY,Libertarian,5768,278503
2352,2020-01-01,WY,Other,5685,278503


In [14]:
df \
.repartition(1) \
    .write \
    .partitionBy("year") \
    .format("parquet") \
    .mode("overwrite") \
    .save("hdfs://hdfs-nn:9000/warehouse/americancrimes.db/politics")