In [1]:
from pyspark import SparkContext
from pyspark.sql import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col
import json

In [72]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType

In [2]:
import pandas as pd

In [3]:
# Initialize SparkContext
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [24]:
SCORECARD_PATH = "./data/Most-Recent-Cohorts-All-Data-Elements.csv"
V2_PATH = "./data/college-search-data-v2.parquet"
V3_PATH = "./data/college-search-data-v3.parquet"
DATA_TYPES_PATH = "./assets/datatypes.json"

In [58]:
def add_datatypes(data_columns):

    with open(DATA_TYPES_PATH) as f:
        cur_datatypes = json.load(f)

    for variable, datatype in data_columns.items():
        cur_datatypes[variable] = datatype
    
    with open(DATA_TYPES_PATH, 'w') as f:
        f.write(json.dumps(cur_datatypes, indent=4))

In [51]:
df = spark.read.load(V2_PATH)
print((df.count(), len(df.columns)))

(6694, 551)


## Edit current data types (Percentage)

In [52]:
percent_columns = [
    "DVADM02",
    "DVADM03",
    "DVADM08",
    "DVADM09",
    "SATPCT",
    "ACTPCT",
    "DVADM04",
    "RMOUSTTP",
    "RMINSTTP",
    "RMUNKNWP",
    "BAGR100",
    "BAGR150",
    "ANYAIDP",
    "PGRNT_P",
    "OFGRT_P",
    "FLOAN_P",
    "OLOAN_P",
    "UAGRNTP",
    "UPGRNTP",
    "UFLOANP",
    "PCTENRBK",
    "PCTENRHS",
    "PCTENRAP",
    "PCTENRAN",
    "PCTENRNR",
    "PCTENRUN"
]

In [53]:
with open(DATA_TYPES_PATH) as f:
    cur_datatypes = json.load(f)

for per_col in percent_columns:
    df = df.withColumn(per_col, F.col(per_col).cast("float"))
    df = df.withColumn(per_col, col(per_col) / 100)
    cur_datatypes[per_col] = "float"

with open(DATA_TYPES_PATH, 'w') as f:
    f.write(json.dumps(cur_datatypes, indent=4))

In [55]:
cur_columns = df.columns

## Merge Override Variables

In [59]:
# add new columns for override
hd2020_df = spark.read.csv("./data/v2_additional_data/HD2020.csv", header=True, inferSchema=True)
drvef2020_df = spark.read.csv("./data/v2_additional_data/DRVEF2020.csv", header=True, inferSchema=True)

override_nonexist_columns = {
    "WEBADDR": "string",
    "NPRICURL": "string",
    "PCTENR2M": "float"
}

add_datatypes(override_nonexist_columns)

In [63]:
override_hd = hd2020_df.select("UNITID", "WEBADDR", "NPRICURL")
override_hd = override_hd.withColumn("UNITID", F.col("UNITID").cast("string"))
override_drvef = drvef2020_df.select("UNITID", "PCTENR2M")
override_drvef = override_drvef.withColumn("UNITID", F.col("UNITID").cast("string"))

df = df.join(override_hd, "UNITID", "left")
df = df.join(override_drvef, "UNITID", "left")

In [67]:
df = df.withColumn("PCTENR2M", F.col("PCTENR2M").cast("float"))
df = df.withColumn("PCTENR2M", col("PCTENR2M") / 100)

In [68]:
df = df.replace({'NULL': None, 'null': None})

In [73]:
null_counts = df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).collect()[0].asDict()

# Save dictionary of null value counts for reference
with open('./assets/null_counts.json', 'w') as f:
    json.dump(null_counts, f)

In [69]:
def column_override(first, second):
    if first is None:
        return second
    return first

In [77]:
udf_1 = udf(column_override, StringType())
udf_2 = udf(column_override, FloatType())
df = df.withColumn("INSTURL", udf_1("WEBADDR", "INSTURL"))
df = df.withColumn("NPCURL", udf_1("NPRICURL", "NPCURL"))
df = df.withColumn("UGDS_2MOR", udf_2("PCTENR2M", "UGDS_2MOR"))
df = df.withColumn("UGDS_WHITE", udf_2("PCTENRWH", "UGDS_WHITE"))
df = df.withColumn("UGDS_BLACK", udf_2("PCTENRBK", "UGDS_BLACK"))
df = df.withColumn("UGDS_AIAN", udf_2("PCTENRAN", "UGDS_AIAN"))
df = df.withColumn("UGDS_NRA", udf_2("PCTENRNR", "UGDS_NRA"))
df = df.withColumn("UGDS_UNKN", udf_2("PCTENRUN", "UGDS_UNKN"))

In [78]:
# check whether override is success
null_counts_2 = df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).collect()[0].asDict()

# Save dictionary of null value counts for reference
with open('./assets/null_counts_v3.json', 'w') as f:
    json.dump(null_counts_2, f)

## Check Data Types

In [79]:
# use PySpark to check and convert data types
with open('./assets/datatypes.json') as f:
    datatypes = json.load(f)

# cast columns to correct datatypes
for field, datatype in datatypes.items(): # takes a few mins to run
    if field not in df.columns:
        continue
    curr_type = dict(df.dtypes)[field]
    if curr_type != datatype and not datatype.startswith(curr_type):
        df = df.withColumn(field, F.col(field).cast(datatype))

## Wrap Up

In [80]:
df.write.save(V3_PATH)

In [None]:
sc.stop()

## Non-override Variables (Already Included)

In [29]:
non_override_columns = {
    "CIP01BACHL": "integer",
    "CIP03BACHL": "integer",
    "CIP04BACHL": "integer",
    "CIP05BACHL": "integer",
    "CIP09BACHL": "integer",
    "CIP10BACHL": "integer",
    "CIP11BACHL": "integer",
    "CIP12BACHL": "integer",
    "CIP13BACHL": "integer",
    "CIP14BACHL": "integer",
    "CIP15BACHL": "integer",
    "CIP16BACHL": "integer",
    "CIP19BACHL": "integer",
    "CIP22BACHL": "integer",
    "CIP23BACHL": "integer",
    "CIP24BACHL": "integer",
    "CIP25BACHL": "integer",
    "CIP26BACHL": "integer",
    "CIP27BACHL": "integer",
    "CIP29BACHL": "integer",
    "CIP30BACHL": "integer",
    "CIP31BACHL": "integer",
    "CIP38BACHL": "integer",
    "CIP39BACHL": "integer",
    "CIP40BACHL": "integer",
    "CIP41BACHL": "integer",
    "CIP42BACHL": "integer",
    "CIP43BACHL": "integer",
    "CIP44BACHL": "integer",
    "CIP45BACHL": "integer",
    "CIP46BACHL": "integer",
    "CIP47BACHL": "integer",
    "CIP48BACHL": "integer",
    "CIP49BACHL": "integer",
    "CIP50BACHL": "integer",
    "CIP51BACHL": "integer",
    "CIP52BACHL": "integer",
    "CIP54BACHL": "integer",
    "NPT41_PUB": "integer",
    "NPT42_PUB": "integer",
    "NPT43_PUB": "integer",
    "NPT44_PUB": "integer",
    "NPT45_PUB": "integer",
    "NPT41_PRIV": "integer",
    "NPT42_PRIV": "integer",
    "NPT43_PRIV": "integer",
    "NPT44_PRIV": "integer",
    "NPT45_PRIV": "integer",
    "GRAD_DEBT_MDN": "float",
    "LO_INC_DEBT_MDN": "float",
    "MD_INC_DEBT_MDN": "float",
    "HI_INC_DEBT_MDN": "float",
    "FIRSTGEN_DEBT_MDN": "float",
    "NOTFIRSTGEN_DEBT_MDN": "float",
    "FIRST_GEN": "float",
    "UNEMP_RATE": "float",
    "MN_EARN_WNE_P10": "integer",
    "PCT10_EARN_WNE_P10": "integer",
    "PCT25_EARN_WNE_P10": "integer",
    "PCT75_EARN_WNE_P10": "integer",
    "PCT90_EARN_WNE_P10": "integer",
    "PCT10_EARN_WNE_P6": "integer",
    "PCT25_EARN_WNE_P6": "integer",
    "PCT75_EARN_WNE_P6": "integer",
    "PCT90_EARN_WNE_P6": "integer",
}

In [43]:
# add datatypes
add_datatypes(non_override_columns)

In [34]:
scorecard_df = spark.read.csv(SCORECARD_PATH, header=True, inferSchema=True)
print((scorecard_df.count(), len(scorecard_df.columns)))

(6694, 2989)


In [35]:
non_override_colnames = ["UNITID"]
for colname, datatype in non_override_columns.items():
    non_override_colnames.append(colname)

In [39]:
non_override_df = scorecard_df.select(*non_override_colnames)
non_override_df = non_override_df.withColumn("UNITID", F.col("UNITID").cast("string"))

In [46]:
print((non_override_df.count(), len(non_override_df.columns)))

(6694, 66)


In [47]:
df = df.join(non_override_df, "UNITID", "left")
print((df.count(), len(df.columns)))

(6694, 616)


In [50]:
col_set = set(df.columns)
print(len(col_set))
print(len(df.columns))

551
616
