## Packages

### Install packages

In [None]:
%sh
pip install adal

### Download packages from github

In [None]:
! git clone https://github.com/YousefGh/kmeans-feature-importance.git
# Load python functions
exec(open('./kmeans-feature-importance/kmeans_interp/kmeans_feature_imp.py', encoding='utf-8').read())

### Import packages

In [None]:
import os
import re
import numpy as np
import pandas as pd
import datetime

import pyspark.sql.functions as pyssql_func
import pyspark.sql.types as pyssql_types
import pyspark.ml.feature as pysml_feature 
import pyspark.sql.window as pyssql_window
import pyspark
import delta.tables as delta_tbl
import shutil
import adal

import re

## Arguments Setup

In [None]:
%run
/path_to_where_you_placed_your_config_notebook/config

In [None]:
# get the arguments commonly used from your config notebook
user_email_path = config.get('user_email_path') 

# custom arguments
project_name = 'testing'

In [None]:
## Arguments for scheduled jobs
# add argument widgets
dbutils.widgets.text("data_month", "")
# # remove argument widgets
# dbutils.widgets.removeAll()

# Get arguments
if dbutils.widgets.get("data_month") == "": # Check if exist
  dbutils.notebook.exit("Please add an argument of a process, data_month")
else: 
  data_month = dbutils.widgets.get("data_month")

Please add an argument of a process, data_month

## Install customized functions

In [None]:
%run
/path_to_where_you_placed_your_functions/db_connection

## Connection to database

In [None]:
# Testing
pushdown_query = "db_schema.db_tableA"
tableA = spark.read\
.jdbc(
    url = db_access.jdbcUrl, 
    table=pushdown_query, 
    properties = db_access.connectionProperties
)\
.select(
    pyssql_func.trim('original_colA').alias('new_colA'),
    pyssql_func.trim('original_colB').alias('new_colB')
).limit(10).dropDuplicates()
display(tableA)

AA_UID,ALPHA_ID
00000032120B542752464790D434B58AD2F16D9093DFF1F5F5E96EBBE59003AB,17294044
000006DE941AC89B2DE4C19AE8E34F2FD2912A0910C3D8F2984161752A8D818D,13837911
000008FB9B4D7BB5D992A49D7D434B5A5A8EC260F0AA20D86951D0E4326CE726,4090272
00000AD348E8E7E93F1A7E0E4FC32D1A05CC3BA52313EE8BF4A5BD9DEFC80403,2171747
00000BF40D56E8BDB77D84AD24282950A33A4C7CDB9FF7018B327678A7F4B4DC,1644045
00000C6336F40932303917D564099BCD82E341A3F5B1919C6EA1A272937C1B58,623039
00000C6A767FCA4C24FCCD1A045EEB44705D947527520171A0675120D57A8872,1907945
00000E385A0924D4DBD7725856C29E4A9D403F2EABEB8DA7D11C82A60D2A50FF,1433646
00000F15BD7CD0777527781F5B80DE495F6FD46361CB548E874B6B810AE577FA,2317627
00000F788A2D554998AEA1412EAE67096C4D9D148F943593834AEF71FC19FCC8,3885064


# Directory

In [None]:
# create dir for project
project_dir = f'dbfs:/FileStore/{user_email_path}/Project/{project_name}'
upload_path = f'{project_dir}/upload/'
download_path = f'{project_dir}/download/'
input_path = f'{project_dir}/input/'
output_path = f'{project_dir}/output/'

dbutils.fs.mkdirs(upload_path)
dbutils.fs.mkdirs(download_path)
dbutils.fs.mkdirs(input_path)
dbutils.fs.mkdirs(output_path)

# Common util

### List Directory

In [None]:
display(dbutils.fs.ls(f'dbfs:/FileStore/{user_email_path}/Project/'))

path,name,size
dbfs:/FileStore/tables/shared_uploads/carrie-sc.lo@aia.com/Project/HotMob/,HotMob/,0
dbfs:/FileStore/tables/shared_uploads/carrie-sc.lo@aia.com/Project/digital_wellness.csv,digital_wellness.csv,5804975
dbfs:/FileStore/tables/shared_uploads/carrie-sc.lo@aia.com/Project/testing/,testing/,0


### Copy Files

In [None]:
dbutils.fs.cp(
    f'{upload_path}/tableA.csv',
    f'{input_path}/tableA.csv',
    True
)

### Move Files

In [None]:
dbutils.fs.mv(
    f{upload_path}/tableA.csv',
    f'{output_path}/tableA.csv',
    True
)

### Read files

In [None]:
# read pickle
df = pd.read_pickle(re.sub(':', '', f'/{input_path}/tableA.pkl', compression = 'gzip'))

In [None]:
# Using spark to read csv
df = spark.read.format('csv').option('header', 'true').option('delimiter', '\t').load(f'/mnt/data_folder/*/*').limit(5)

In [None]:
# Read data through Hive
from pyspark.sql import SparkSession, HiveContext,DataFrameWriter
from pyspark import SparkContext, SparkConf
conf = SparkConf() #Declare spark conf variable\
conf.setAppName("Read-and-write-data-to-Hive-table-spark")
sc = SparkContext.getOrCreate(conf=conf)
#Instantiate hive context class to get access to the hive sql method
hc = HiveContext(sc)
hive_context= HiveContext(sc)

table_path = 'db_schema.db_tableA'
df = hive_context.sql(("select * from {}").format(table_path))

In [None]:
# Convert to pandas DF
df = df.toPandas()

### Write files

In [None]:
# save spark df file
spark_df.coalesce(1).write.mode('overwrite').format('csv').save(f'{output_path}/spark_df', header='true')

In [None]:
# save csv
tableA.to_csv(re.sub(':', '', f'/{output_path}/tableA.csv'), index=False)
# save pickle
tableA.to_pickle(re.sub(':', '', f'/{output_path}/tableA.pkl'))

In [None]:
# zip the whole directory
import shutil
sourcePath = re.sub(':', '', f'/{project_dir}')
filename = 'project_zip'
zipPath = f'./{filename}'
shutil.make_archive(zipPath, 'zip', sourcePath)
# os.listdir('.')
shutil.move(f'{filename}.zip', f'{sourcePath}/{filename}.zip')

### Delete file

In [None]:
dbutils.fs.rm("dbfs:/FileStore/your_email@company.com/Project/testing/project_zip.zip",True)

# Download file to local laptop
https://xxxxxxxxxxxxxxxxxxxx.azuredatabricks.net/files/your_email@company.com/Project/testing/project_zip.zip