In [0]:
import requests
import json
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, ArrayType, LongType, TimestampType, BinaryType, IntegerType, DateType
from datetime import datetime
import time
import zipfile
import os
import pandas as pd 

In [0]:
dbutils.widgets.text("fab", "fab") 
dbutils.widgets.text("period", "period") 
dbutils.widgets.text("table", "table") 

fab =  dbutils.widgets.get("fab")
period = dbutils.widgets.get("period") 
table =  dbutils.widgets.get("table")

# fab = "D21"
# table = "vw_D21_3S_TRANS_OIL_AZURE_MONTH"
# period "M"

In [0]:
file_info_schema = StructType([
    StructField('DATE', StringType(), True),
    StructField('STATION', StringType(), True),
    StructField('G', StringType(), True),
    StructField('QTY', StringType(), True),
    StructField('ETL_DATE', StringType(), True)
  
])

convert_udf = udf(lambda x: str(time.strftime("%Y-%m-%dT06:%M:%SZ", time.strptime(str(x),"%Y%m%d"))),StringType())  # python udf

try:
    raw_df = (spark.read.format('csv')
          .option('header',True)
          .schema(file_info_schema)
          .load(f'/mnt/source/incremental/{table}.csv')
          .withColumn('timestamp',convert_udf(F.col('DATE')))
          .withColumnRenamed('QTY','value')
          .select('timestamp','STATION','G','value')
          .repartition('station')
          .write
          .format('delta')
          .mode('append')
          .save(f'/mnt/deltalake/{table}')
             )
except Exception as e:
    print(e)

In [0]:
#pandas df
raw_df = spark.read.format('delta').load(f'/mnt/deltalake/{table}')
raw_df_pandas = raw_df.toPandas()

station_list = list(raw_df_pandas['STATION'].unique())
for name in station_list:
    print(name + ' ziptime :' + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    G_list = list(raw_df_pandas[raw_df_pandas['STATION'] == name]['G'].unique())
    
    dbutils.fs.mkdirs(f'file:/databricks/driver/{fab}/{period}/{name}')
    for g in G_list:
        select_name = raw_df_pandas['STATION'] == name
        select_g = raw_df_pandas['G'] == g
        raw_df_pandas[(select_name & select_g)][['timestamp','value']].sort_values(by=['timestamp']).reset_index(drop=True).to_csv(f'/databricks/driver/{fab}/{period}/{name}/{g}.csv',index=False)  
          
    z = zipfile.ZipFile(f'/databricks/driver/{fab}/{period}/{name}.zip', "w", zipfile.ZIP_DEFLATED)
    for f in os.listdir(f'/databricks/driver/{fab}/{period}/{name}'):
        if f.endswith("csv"):
            z.write(f'/databricks/driver/{fab}/{period}/{name}/{f}' , f )
    z.close()
    dbutils.fs.mv(f'file:/databricks/driver/{fab}/{period}/{name}.zip', f'/mnt/zipfile/{fab}/{period}/{name}.zip')
    dbutils.fs.rm(f'file:/databricks/driver/{fab}/{period}/{name}', True)

In [0]:
# spark df
# list_station = df.select('STATION').drop_duplicates().rdd.collect()
# for row in list_station:
#     print(row.STATION + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
#     df_station = df.filter(F.col('STATION') == row.STATION )
#     list_g = df_station.select('G').drop_duplicates().rdd.collect()
    
#     dbutils.fs.mkdirs(f'file:/databricks/driver/{row.STATION}')
#     for rowg in list_g:
#         df_station_g = (df_station.filter(F.col('G') == rowg.G )
#                        .select('timestamp','value')
#                        )
#         df_station_g_pandas = df_station_g.toPandas()
#         df_station_g_pandas.to_csv(f'/databricks/driver/{row.STATION}/{rowg.G}.csv',index=False)
    
#     z = zipfile.ZipFile(f'/databricks/driver/{row.STATION}.zip', "w", zipfile.ZIP_DEFLATED)
#     for f in os.listdir(f'/databricks/driver/{row.STATION}'):
#         if f.endswith("csv"):
#             z.write(f'/databricks/driver/{row.STATION}/{f}',f)
#     z.close()
#     dbutils.fs.mv(f'file:/databricks/driver/{row.STATION}.zip', f'/mnt/zipfile/{row.STATION}.zip')
#     dbutils.fs.rm(f'file:/databricks/driver/{row.STATION}', True)