# WARNING!

In [None]:
# this is a draft version; this notebook will be soon extended with real working code that 
# uses data files and shows the examples of how to transform the data.
# Be patient, please

# Setup

This is an example of how the Jupyter kernel for this notebook can be configured path to the file with this content (in my case python was installed to /opt/anaconda3): 
`/opt/anaconda3/share/jupyter/kernels/pyspark-cluster-64cores-8ex/kernel.json`

WARNINGS: 
- replace "yourenv" with your environment name, replace "yourhostname" with your real host name
- replace /opt/spark-3.0.0-bin-without-hadoop-local  with your path to Spark and the version
   py4j-0.10.9-src.zip to your actual version


Contents of the kernel.json:

`{
 "display_name": "pyspark-cluster-64cores-8ex",
 "language": "python",
 "argv": [
    "/opt/anaconda3/envs/yourenv/bin/python",
    "-m",
    "ipykernel",
    "-f",
    "{connection_file}"
 ],
 "env": {
    "ENV_NAME": "dev",
    "CAPTURE_STANDARD_OUT": "true",
    "CAPTURE_STANDARD_ERR": "true",
    "SEND_EMPTY_OUTPUT": "false",
    "PYSPARK_PYTHON": "/opt/anaconda3/envs/yourenv/bin/python",
    "PYSPARK_DRIVER_PYTHON": "/opt/anaconda3/envs/yourenv/bin/python",
    "SPARK_HOME": "/opt/spark-3.0.0-bin-without-hadoop-local",
    "PYTHONPATH": "/opt/spark-3.0.0-bin-without-hadoop-local/python/:/opt/spark-3.0.0-bin-without-hadoop-local/python/lib/py4j-0.10.9-src.zip",
    "PYTHONSTARTUP": "/opt/spark-3.0.0-bin-without-hadoop-local/python/pyspark/shell.py",
    "PYSPARK_SUBMIT_ARGS": "--driver-memory 8G --executor-memory 28G --num-executors 8  --executor-cores 8 --total-executor-cores 64 --conf spark.sql.shuffle.partitions=300 --conf spark.default.parallelism=300 --conf \"spark.executor.extraJavaOptions=-Djava.io.tmpdir=/mnt/sparkExecutorTmp\" --conf \"spark.driver.extraJavaOptions=-Djava.io.tmpdir=/mnt/sparkDriverTmp\" --packages io.delta:delta-core_2.12:0.7.0 --master spark://yourhostname:7077 pyspark-shell"
 }
}`

# Imports

In [None]:
spark

In [None]:
# before using functions from the spark_framework package, you need to call init(spark) 
# to pass the reference to the spark session 
from spark_framework import *
init(spark)

# Show Environment

In [None]:
import os
if "PYSPARK_SUBMIT_ARGS" in os.environ:
    print(os.environ["PYSPARK_SUBMIT_ARGS"])

In [None]:
spark

# Main

## Connections

In [None]:
# for local filesystem connections we need to set type="local" and "url"="file:///[root_dir]". 
# root_dir is optional: if you don't set it, you will have no "root" folder and on load/save
# you will have to specify the full path to files or directories
CONN_LOCAL = {"type": "local", "url": "file:///d/data"}

In [None]:
# for a distributed file system you need to specify the url in format "schema://host[/root_dir]" 
CONN_DFS = {"type": "dfs", "url": "adl://your_adls_name.azuredatalakestore.net"}

In [None]:
# for JDBC connections you need to specify type="jdbc", url is a mandatory valid JDBC connection string.
# the rest parameters are optional and will be passed to df.read.option(param, value) or 
# spark.read.option(param, value) 
# typically, driver, user and password are needed to handle JDBC connections from Spark
CONN_JDBC = {
    "type": "jdbc",
    "url": "jdbc:postgresql://yourservername:5432/yourdatabasename", 
    "driver": "org.postgresql.Driver",
    "user": "user",
    "password": "******"
}

## Loading Data - Examples

In [None]:
# this is just an example - to show you different ways of how to load the data
if False:
    # local filesystem: single file
    df = load(CONN_LOCAL, "dir/file.csv", a_format="csv", a_row_count=True)
    df = load(CONN_LOCAL, {"name": "dir/file.csv", "format": "csv"}, a_row_count=True)

    # local filesystem: folder
    df = load(CONN_LOCAL, "dir/", a_format="csv",a_row_count=True)

    # local filesystem: file mask
    df = load(CONN_LOCAL, "dir/*2019*.csv", a_format="csv", a_row_count=True)

    # DFS: single file
    df = load(CONN_DFS, "/path/file.csv", a_format="csv", a_row_count=True)

    # DFS: file mask
    df = load(CONN_DFS, "/path/*.csv", a_format="csv", a_row_count=True)

    # DFS: folder
    df = load(SOURCE_MAIN, "/path/dir", a_format="csv", a_row_count=True)

    # JDBC
    df = load(SOURCE_JDBC, "table_name", a_row_count=True)

## Loading data

In [None]:
TABLE1 = {'name': './../data/dir', 'format': 'parquet'}

# format attribute is mandatory.  It can be - "csv", "csv.gz", "parquet".  
# For "csv" and "csv.gz" you need to specify "header": True/False 
# and optionally "schema"

df1 = load(CONN_LOCAL, TABLE1, a_row_count=True, a_cache=True)

In [None]:
show(df1, a_limit=10)

## Saving to ADLS

In [None]:
TABLE_DFS = {'name': '/dest_path/dest_table', 'format': 'csv', 'header': True}
save(df1, CONN_DFS, TABLE_DFS)

## Saving to PostgreSQL

In [None]:
save(df1, CONN_JDBC, "public.temp_table", a_fast_write=False)  # without fast wrire

In [None]:
save(df1, CONN_JDBC, "public.temp_table")  # with fast wrire - default
# WARNING!   THE TABLE SHOULD ALREADY EXIST IN POSTGRES 

## Changing the dataframe

In [None]:
ps(df1)

In [None]:
# possible parameters:
#  a_select
#  a_drop
#  a_replace, 
#  a_rename,
#  a_add,
#  a_distinct,
#  a_filter_df, a_filter_columns, 
#  a_filter_not_df, a_filter_not_columns,
#  a_where, 
#  a_select_end, 
#  a_drop_end,
#  a_order_by

df2 = change(df1,  
             a_select=["field1", "field2", "field3"],
             a_add={"DUMMY": "1"},
             a_where="some_key <= 1000", 
             a_replace={"field1": "cast(field1 as double)"},  # SQL expression
             a_rename={"field1": "field2"},
             a_order_by={"field1": "desc"},
             a_verbose_level=5)

In [None]:
ps(df2)

## SQL

In [None]:
show(sql("select * from {0} order by field1 desc", df1))

## Caching

In [None]:
cache(df1, a_row_count=True)

In [None]:
uncache_all()

## Temp Table

In [None]:
name = temp_table(df1)

## Other

In [None]:
show(groupby_count(df1, "DUMMY"))