In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import *

In [0]:
schema_details = StructType([
    StructField("BEGIN_YEARMONTH", StringType(),True),
    StructField("BEGIN_DAY", StringType(),True),
    StructField("BEGIN_TIME", StringType(),True),
    StructField("END_YEARMONTH", StringType(),True),
    StructField("END_DAY", StringType(),True),
    StructField("END_TIME", StringType(),True),
    StructField("EPISODE_ID", IntegerType(),True),
    StructField("EVENT_ID", IntegerType(),True),
    StructField("STATE", StringType(),True),
    StructField("STATE_FIPS", IntegerType(),True),
    StructField("YEAR", IntegerType(),True),
    StructField("MONTH_NAME", StringType(),True),
    StructField("EVENT_TYPE", StringType(),True),
    StructField("CZ_TYPE", StringType(),True),
    StructField("CZ_FIPS", IntegerType(),True),
    StructField("CZ_NAME", StringType(),True),
    StructField("WFO", StringType(),True),
    StructField("BEGIN_DATE_TIME", StringType(),True),
    StructField("CZ_TIMEZONE", StringType(),True),
    StructField("END_DATE_TIME", StringType(),True),
    StructField("INJURIES_DIRECT", IntegerType(),True),
    StructField("INJURIES_INDIRECT", IntegerType(),True),
    StructField("DEATHS_DIRECT", IntegerType(),True),
    StructField("DEATHS_INDIRECT", IntegerType(),True),
    StructField("DAMAGE_PROPERTY", StringType(),True),
    StructField("DAMAGE_CROPS", StringType(),True),
    StructField("SOURCE", StringType(),True),
    StructField("MAGNITUDE", DoubleType(),True),
    StructField("MAGNITUDE_TYPE", StringType(),True),
    StructField("FLOOD_CAUSE", StringType(),True),
    StructField("CATEGORY", IntegerType(),True),
    StructField("TOR_F_SCALE", StringType(),True),
    StructField("TOR_LENGTH", DoubleType(),True),
    StructField("TOR_WIDTH", IntegerType(),True),
    StructField("TOR_OTHER_WFO", StringType(),True),
    StructField("TOR_OTHER_CZ_STATE", StringType(),True),
    StructField("TOR_OTHER_CZ_FIPS", IntegerType(),True),
    StructField("TOR_OTHER_CZ_NAME", StringType(),True),
    StructField("BEGIN_RANGE", IntegerType(),True),
    StructField("BEGIN_AZIMUTH", StringType(),True),
    StructField("BEGIN_LOCATION", StringType(),True),
    StructField("END_RANGE", IntegerType(),True),
    StructField("END_AZIMUTH", StringType(),True),
    StructField("END_LOCATION", StringType(),True),
    StructField("BEGIN_LAT", DoubleType(),True),
    StructField("BEGIN_LON", DoubleType(),True),
    StructField("END_LAT", DoubleType(),True),
    StructField("END_LON", DoubleType(),True),
    StructField("EPISODE_NARRATIVE",StringType(),True),
    StructField("EVENT_NARRATIVE",StringType(),True),
    StructField("DATA_SOURCE", StringType(),True)
])

schema_fatalities = StructType([
    StructField("FAT_YEARMONTH", StringType(),True),
    StructField("FAT_DAY", StringType(),True),
    StructField("FAT_TIME", StringType(),True),
    StructField("FATALITY_ID INT", IntegerType(),True),
    StructField("FATALITY_ID", IntegerType(),True),
    StructField("EVENT_ID", IntegerType(),True),
    StructField("FATALITY_TYPE", StringType(),True),
    StructField("FATALITY_DATE", StringType(),True),
    StructField("FATALITY_AGE", IntegerType(),True),
    StructField("FATALITY_SEX", StringType(),True),
    StructField("FATALITY_LOCATION",IntegerType(),True),
    StructField("EVENT_YEARMONTH",StringType(),True)
])

In [0]:
#2 fields below included in connection_string
storage_account_name = "pipelinestorageacctaus"
storage_account_access_key = "f6fWRdrrX8qYB9a1y2Rlgu7qCuyeHuD59j3UIb0hi3ZanAn8DUmej+uofzFi7irJm954fTa5LtBb+AStzjJHYA=="
blob_container = "severeweathercontainer"

In [0]:
detPath = "wasbs://"  + blob_container + "@" + storage_account_name + ".blob.core.windows.net/details/*.gz"
fatPath = "wasbs://"  + blob_container + "@" + storage_account_name + ".blob.core.windows.net/fatalities/*.gz"

df_details = spark.read.format("csv").option("header", True).schema(schema_details).load(detPath)
df_fatalities = spark.read.format("csv").option("header", True).schema(schema_fatalities).load(fatPath)

print('count of details loaded to spark: ' +  str(df_details.count()))
print('count of fatalities loaded to spark: ' + str(df_fatalities.count()))

In [0]:
df_details.createOrReplaceTempView("details")

In [0]:
spark.sql("SELECT \
    CONCAT(BEGIN_YEARMONTH,BEGIN_NEWD) AS BEGIN_FULLDATE, \
    CONCAT(END_YEARMONTH,END_NEWD) AS END_FULLDATE \
    FROM (SELECT \
              BEGIN_YEARMONTH,\
              CASE WHEN LENGTH(BEGIN_DAY) = 1 THEN CONCAT('0',BEGIN_DAY) ELSE BEGIN_DAY END AS BEGIN_NEWD, \
              END_YEARMONTH, \
              CASE WHEN LENGTH(END_DAY) = 1 THEN CONCAT('0',END_DAY) ELSE END_DAY END AS END_NEWD \
          FROM details) d").show();

### Alternate method to produce same results using dataframe syntax

In [0]:
df_details.withColumn('BEGIN_DAY_NEW', \
                      when(length(col('BEGIN_DAY')) == 1, concat(lit('0'),col('BEGIN_DAY'))). \
                      otherwise(col('BEGIN_DAY'))). \
          withColumn('BEGIN_FULLDATE', \
                     concat(col('BEGIN_YEARMONTH'),col('BEGIN_DAY_NEW'))). \
          withColumn('END_DAY_NEW', \
                     when(length(col('END_DAY')) == 1, concat(lit('0'),col('END_DAY'))). \
                     otherwise(col('END_DAY'))). \
          withColumn('END_FULLDATE', concat(col('END_YEARMONTH'),col('END_DAY_NEW'))). \
  select('BEGIN_FULLDATE','END_FULLDATE').show(10)