In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text("catalog_name","priceriskanalysis")
catalog_name = dbutils.widgets.get("catalog_name")
dbutils.widgets.text("commodity","CORN")
commodity = dbutils.widgets.get("commodity")
dbutils.widgets.text("statistic","YIELD")
statistic = dbutils.widgets.get("statistic")

if statistic == "YIELD":
    bronze_path = f"/Volumes/{catalog_name}/bronze/usda_bronze/yeild/"
elif statistic == "PRODUCTION":
    bronze_path = f"/Volumes/{catalog_name}/bronze/usda_bronze/production/"
elif statistic == "AREA HARVESTED":
    bronze_path = f"/Volumes/{catalog_name}/bronze/usda_bronze/area_harvested/"
elif statistic == "PRICE RECEIVED":
    bronze_path = f"/Volumes/{catalog_name}/bronze/usda_bronze/price_received/"
else:
    raise ValueError("Unknown Statistic")

In [0]:
files = dbutils.fs.ls(bronze_path)
display(files)

In [0]:
latest_json = sorted(files)[-1].path
print(latest_json)

In [0]:
df = spark.read.format("json")\
               .load(latest_json)
display(df)

In [0]:
df.printSchema()

In [0]:
df_exp = df.select(explode("data").alias("record"))
display(df_exp)

In [0]:
df_exp.printSchema()

In [0]:
df_silver = df_exp.select("record.*")
display(df_silver)

In [0]:
df_silver.columns

In [0]:
df_silver = df_silver.select(
    "year",
    "state_name",
    "state_alpha",
    "country_name",
    "commodity_desc",
    "statisticcat_desc",
    "Value",
    "CV (%)",
    "unit_desc",
    "agg_level_desc",
    "freq_desc",
    "group_desc",
    "sector_desc",
    "prodn_practice_desc",
    "util_practice_desc",
    "reference_period_desc",
    "source_desc",
    "load_time"
)

display(df_silver)


In [0]:
df_silver = df_silver.withColumn("Value",regexp_replace(col("Value"),",",""))

In [0]:
df_silver_1= df_silver.withColumn("Value",when(trim(col('Value')) == "(D)","0").otherwise(col('Value')))\
                      .withColumn("CV (%)",when(trim(col('CV (%)')) == "(D)","0").otherwise(col('CV (%)')))\
                      .withColumn("CV (%)",when(trim(col('CV (%)')) == "(L)","0").otherwise(col('CV (%)')))\
                      .withColumn("CV (%)",when(trim(col('CV (%)')) == "(H)","0").otherwise(col('CV (%)')))\
                      .withColumn("CV (%)",when(trim(col('CV (%)')) == "","0").otherwise(col('CV (%)')))
 

display(df_silver_1)

In [0]:
df_silver_2 = df_silver_1.withColumn("Value",col('Value').cast('float'))\
                      .withColumn("CV (%)",col('CV (%)').cast('float'))\
                       .withColumn("load_time",to_timestamp(col('load_time')))
display(df_silver_2)

In [0]:
df_silver_2 = df_silver_2.withColumnRenamed("CV (%)","CV")

In [0]:
for i in df_silver_2.columns:
     x = df_silver_2.filter(col(i).isNull()).count()
     print(f"{i}: {x}")

In [0]:
if statistic == "YIELD":
    silver_table = f"{catalog_name}.silver.usda_corn_yield"
elif statistic == "PRODUCTION":
    silver_table = f"{catalog_name}.silver.usda_corn_production"
elif statistic == "AREA HARVESTED":
    silver_table = f"{catalog_name}.silver.usda_corn_area_harvested"
elif statistic == "PRICE RECEIVED":
    silver_table = f"{catalog_name}.silver.usda_corn_price_received"
else:
    raise ValueError("Unknown statistic")
print(silver_table)

In [0]:
df_silver_2.write.mode("overwrite").format("delta").saveAsTable(silver_table)
