Goal of this notebook is to map the metadata and data in bronze to a silver table, with different rules for each of the four types of values.

In particular: 
- small categorical features get one-hot encoded (so 1 column with 7 types will become 7 columns)
- binary values are mapped to True, optionally False, and optionally NULL

In [3]:
%load 

'/home/jonoepd/Projects/census/spark/notebooks'

In [1]:
%%sql

DESCRIBE TABLE nessie.bronze.ddm

UsageError: Cell magic `%%sql` not found.


In [2]:
# spark DF has some helpful functions that are a bit of a mix between sqlalchemy and pandas
# T here is just a table name, not some special keyword.
import pyspark.sql.functions as pys_fn

# test out our query
df = spark.sql("SELECT * FROM nessie.bronze.ddm WHERE cardinality = 'small categorical'")
df.withColumn(
    "T", pys_fn.arrays_zip("codes", "texts")
).withColumn(
    "T", pys_fn.explode("T")
).select(
    "field", pys_fn.col("T.codes"), pys_fn.col("T.texts")
).show()

NameError: name 'spark' is not defined

In [153]:
# now that we're satisfied it works, filter out the columns we don't care about.

import metadata
md_cols = [ m.column for m in metadata.METADATA_LIST] # we use that a lot maybe make a function?

In [112]:
# now run the query from above w/ filtering
# we'll use Spark.DataFrame instead of the spark.sql syntax now. 
# notice how similar it is to pandas (can even do .loc and .iloc, except now it's parallelized!)
# but with the query sugar of sqlalchemy!

from pyspark.sql.functions import col
df = spark.table("nessie.bronze.ddm")
df_expl = df.where(df.cardinality == 'small categorical')[df.field.isin(md_cols)].withColumn(
    "T", pys_fn.arrays_zip("codes", "texts")
).withColumn(
    "T", pys_fn.explode("T")
).select(
    "field", pys_fn.col("T.codes"), pys_fn.col("T.texts")
).sort("field")
df_expl.show()
df_expl.createOrReplaceTempView("df_expl")

+-----+--------+--------------------+
|field|   codes|               texts|
+-----+--------+--------------------+
| DRAT|null_str|N/A (No service-c...|
| DRAT|       1|           0 percent|
| DRAT|       2|    10 or 20 percent|
| DRAT|       3|    30 or 40 percent|
| DRAT|       4|    50 or 60 percent|
| DRAT|       5|70, 80, 90, or 10...|
| DRAT|       6|        Not reported|
|  ENG|null_str|N/A (less than 5 ...|
|  ENG|       1|           Very well|
|  ENG|       2|                Well|
|  ENG|       3|            Not well|
|  ENG|       4|          Not at all|
|  MAR|       1|             Married|
|  MAR|       2|             Widowed|
|  MAR|       3|            Divorced|
|  MAR|       4|           Separated|
|  MAR|       5|Never married or ...|
|  MIL|null_str|N/A (less than 17...|
|  MIL|       1|  Now on active duty|
|  MIL|       2|On active duty in...|
+-----+--------+--------------------+
only showing top 20 rows



In [140]:
df_smallcat = spark.sql("""
SELECT CONCAT(field, "_", COALESCE(code, 'NULL')) AS silver_column, field, CAST (code AS int), text
FROM (
    SELECT field, CASE codes WHEN 'null_str' THEN NULL ELSE codes END code, texts AS text
    FROM df_expl )
    """
)
# we will use this df to generate more features for our silver table
df_smallcat.show()

+-------------+-----+----+--------------------+
|silver_column|field|code|                text|
+-------------+-----+----+--------------------+
|    DRAT_NULL| DRAT|NULL|N/A (No service-c...|
|       DRAT_1| DRAT|   1|           0 percent|
|       DRAT_2| DRAT|   2|    10 or 20 percent|
|       DRAT_3| DRAT|   3|    30 or 40 percent|
|       DRAT_4| DRAT|   4|    50 or 60 percent|
|       DRAT_5| DRAT|   5|70, 80, 90, or 10...|
|       DRAT_6| DRAT|   6|        Not reported|
|     ENG_NULL|  ENG|NULL|N/A (less than 5 ...|
|        ENG_1|  ENG|   1|           Very well|
|        ENG_2|  ENG|   2|                Well|
|        ENG_3|  ENG|   3|            Not well|
|        ENG_4|  ENG|   4|          Not at all|
|        MAR_1|  MAR|   1|             Married|
|        MAR_2|  MAR|   2|             Widowed|
|        MAR_3|  MAR|   3|            Divorced|
|        MAR_4|  MAR|   4|           Separated|
|        MAR_5|  MAR|   5|Never married or ...|
|     MIL_NULL|  MIL|NULL|N/A (less than

In [130]:
# best practice for CTAS is to create the df before insert
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.silver")
# let's further divide the silver tables into a metadata (md) namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.silver.md")

df_smallcat.limit(0).writeTo("nessie.silver.md.smallcat").createOrReplace()
spark.sql("DESCRIBE TABLE nessie.silver.md.smallcat").show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|silver_column|   string|   NULL|
|        field|   string|   NULL|
|         code|      int|   NULL|
|         text|   string|   NULL|
+-------------+---------+-------+



In [131]:
# okay now we can insert
df_smallcat.writeTo("nessie.silver.md.smallcat").append()

In [132]:
spark.sql("SELECT * FROM nessie.silver.md.smallcat").show(10)

+-------------+-----+----+--------------------+
|silver_column|field|code|                text|
+-------------+-----+----+--------------------+
|    DRAT_NULL| DRAT|NULL|N/A (No service-c...|
|       DRAT_1| DRAT|   1|           0 percent|
|       DRAT_2| DRAT|   2|    10 or 20 percent|
|       DRAT_3| DRAT|   3|    30 or 40 percent|
|       DRAT_4| DRAT|   4|    50 or 60 percent|
|       DRAT_5| DRAT|   5|70, 80, 90, or 10...|
|       DRAT_6| DRAT|   6|        Not reported|
|     ENG_NULL|  ENG|NULL|N/A (less than 5 ...|
|        ENG_1|  ENG|   1|           Very well|
|        ENG_2|  ENG|   2|                Well|
+-------------+-----+----+--------------------+
only showing top 10 rows



In [206]:
# okay done with the setup for small categorical columns. 
import numpy as np
df_b = df.where(df.cardinality == 'binary')[df.field.isin(md_cols)]
# the logic here will be easier in pandas!
cols = ["field", "codes", "has_null", "texts"]
pdf_b = df_b.select(cols).toPandas()
null_idx = [ codes[0] == "null_str" for codes in pdf_b["codes"] ]
code_true = np.array([ codes[0+isnull] for isnull, codes in zip(null_idx, pdf_b["codes"]) ])
code_false = np.array([ codes[1+isnull] if len(codes) > 1 else 'NULL' for isnull, codes in zip(null_idx, pdf_b["codes"])  ])
# guard against error if true and false are marked as same for some reason
code_false[code_true == code_false] = "NULL"
pdf_b["code_true"] = code_true
pdf_b["code_false"] = code_false
pdf_b

Unnamed: 0,field,codes,has_null,texts,code_true,code_false
0,ADJINC,"[1042311, 1042311]",0,"[2022 factor (1.042311), 2022 factor (1.042311)]",1042311,
1,ADJINC,"[1042311, 1042311]",0,"[2022 factor (1.042311), 2022 factor (1.042311)]",1042311,
2,DEAR,"[1, 2]",0,"[Yes, No]",1,2.0
3,DEYE,"[1, 2]",0,"[Yes, No]",1,2.0
4,DIS,"[1, 2]",0,"[With a disability, Without a disability]",1,2.0
5,DOUT,"[null_str, 1, 2]",1,"[N/A (Less than 15 years old), Yes, No]",1,2.0
6,DPHY,"[null_str, 1, 2]",1,"[N/A (Less than 5 years old), Yes, No]",1,2.0
7,HINS1,"[1, 2]",0,"[Yes, No]",1,2.0
8,HINS2,"[1, 2]",0,"[Yes, No]",1,2.0
9,HINS3,"[1, 2]",0,"[Yes, No]",1,2.0


In [207]:
spark.createDataFrame(pdf_b).writeTo("nessie.silver.md.binary").createOrReplace()
spark.sql("SELECT * FROM nessie.silver.md.binary").show(40) # check everything is ok

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+--------+------------------+--------+--------------------+---------+----------+
|   field|             codes|has_null|               texts|code_true|code_false|
+--------+------------------+--------+--------------------+---------+----------+
|  ADJINC|[1042311, 1042311]|       0|[2022 factor (1.0...|  1042311|      NULL|
|  ADJINC|[1042311, 1042311]|       0|[2022 factor (1.0...|  1042311|      NULL|
|    DEAR|            [1, 2]|       0|           [Yes, No]|        1|         2|
|    DEYE|            [1, 2]|       0|           [Yes, No]|        1|         2|
|     DIS|            [1, 2]|       0|[With a disabilit...|        1|         2|
|    DOUT|  [null_str, 1, 2]|       1|[N/A (Less than 1...|        1|         2|
|    DPHY|  [null_str, 1, 2]|       1|[N/A (Less than 5...|        1|         2|
|   HINS1|            [1, 2]|       0|           [Yes, No]|        1|         2|
|   HINS2|            [1, 2]|       0|           [Yes, No]|        1|         2|
|   HINS3|            [1, 2]

In [224]:
# create large categorical and number fields. nothing fancy yet. 
# we'll use largecat later to compute category aggregates
df_lc = df.where(df.cardinality == 'large categorical')[df.field.isin(md_cols)].select("field", "codes", "texts", "name_text")
df_lc.writeTo("nessie.silver.md.largecat").createOrReplace()
df_lc.show(5)

+--------+--------------------+--------------------+--------------------+
|   field|               codes|               texts|           name_text|
+--------+--------------------+--------------------+--------------------+
|   ANC1P|[1, 3, 5, 8, 9, 1...|[Alsatian, Austri...|Recoded Detailed ...|
|   ANC2P|[1, 3, 5, 8, 9, 1...|[Alsatian, Austri...|Recoded Detailed ...|
|DIVISION|[0, 1, 2, 3, 4, 5...|[Puerto Rico, New...|Division code bas...|
|DIVISION|[0, 1, 2, 3, 4, 5...|[Puerto Rico, New...|Division code bas...|
|   FOD1P|[null_str, 1100, ...|[N/A (less than b...|Recoded field of ...|
+--------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [225]:
df_num = df.where(df.cardinality == 'number')[df.field.isin(md_cols)].select("field", "codes", "texts", "name_text")
df_num.writeTo("nessie.silver.md.number").createOrReplace()
df_num.show(5)

+------+--------------------+--------------------+--------------------+
| field|               codes|               texts|           name_text|
+------+--------------------+--------------------+--------------------+
|  AGEP|             [0, 99]|[Under 1 year, 1 ...|                 Age|
|MARHYP|[null_str, 1943, ...|[N/A (age less th...|   Year last married|
|   OIP|[null_str, 0, 99999]|[N/A (less than 1...|All other income ...|
|   PAP|[null_str, 0, 30000]|[N/A (less than 1...|Public assistance...|
| PERNP|[null_str, 0, -10...|[N/A (less than 1...|Total person's ea...|
+------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [229]:
%%sql
-- check everything is ok 
SHOW TABLES IN nessie

namespace,tableName,isTemporary
bronze,census,False
bronze,ddm,False
silver.md,binary,False
silver.md,largecat,False
silver.md,number,False
silver.md,smallcat,False
stage,census,False
stage,dd,False


In [220]:
%%sql

DESCRIBE TABLE nessie.silver.md.number

col_name,data_type,comment
field,string,
codes,array<string>,
texts,array<string>,
name_text,string,


In [221]:
%%sql

DESCRIBE TABLE nessie.silver.md.largecat

col_name,data_type,comment
field,string,
codes,array<string>,
texts,array<string>,
name_text,string,
