In [None]:
import multiprocessing
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.window as W
from pyspark.sql.utils import AnalysisException
import os
import math
import shutil, errno
import pandas as pd

In [None]:
def init_spark(name, mem="5gb", num_cpus=None):
    """ Initialize a SparkSession. """
    if num_cpus is None:
        num_cpus = multiprocessing.cpu_count()
 
    return SparkSession.Builder() \
        .master('local') \
        .appName(name) \
        .config('spark.executor.memory', mem) \
        .config("spark.cores.max", str(num_cpus)) \
        .config("spark.logConf", "true") \
        .config("spark.sql.session.timeZone", "UTC") \
            .getOrCreate()

In [None]:
def copydirectory(src, dst, override=False):
    """ Copy directory from src to dst, with option to override. """
    if (override == False) & (os.path.exists(dst)):
        print(f'"{dst}" already exists, passing...')
        pass
    
    else:
        if os.path.exists(dst):
            shutil.rmtree(dst)
        try:
            shutil.copytree(src, dst)
        except OSError as exc: # python >2.5
            if exc.errno in (errno.ENOTDIR, errno.EINVAL):
                shutil.copy(src, dst)
            else: raise
        print(f'"{src}"\n copied to: "{dst}"')

### Initialize pyspark

In [None]:
spark = init_spark('dsgrid-load') # <--- same as a pyspark instance in shell
lookup_file = '/Users/lliu2/Documents/dsGrid/dsgrid_v2.0.0/commercial/load_data_lookup.parquet' # <---


In [None]:
### relocation the original load_data_lookup
# define path for relocation
file_rename = list(os.path.splitext(
    os.path.basename(lookup_file)
))
file_rename[0] = file_rename[0]+'_orig'
file_rename = ''.join(file_rename)
relocated_file = os.path.join(
    os.path.dirname(lookup_file),
    file_rename
)

# execute
copydirectory(lookup_file, relocated_file, override=False)

### 1. Load data as a Spark df

In [None]:
### load data from relocated file
df_lookup = spark.read.parquet(relocated_file)
    
# df_lookup.show()

### 2. Get keys to enumerate on

In [None]:
keys_to_exclude = ['scale_factor', 'data_id', 'id']
keys = [x for x in df_lookup.columns if x not in keys_to_exclude]
print(keys)

### 3. Expand Load_data_lookup to all combinations of keys, keep new combinations null

In [None]:
if len(keys)>1:
    df_lookup_full = df_lookup.select(keys[0]).distinct()

    for key in keys[1:]:
        df_lookup_full = df_lookup_full.crossJoin(
            df_lookup.select(key).distinct()
        )

    df_lookup_full = df_lookup_full.join(
        df_lookup, keys, 'left'
    ).sort(['id']+keys)
else:
    df_lookup_full = df_lookup
    
# df_lookup_full.show()

### 4. Data Check
#### 4.1. Mapping Report

In [None]:
N_df_lookup = df_lookup.count()
N_df_lookup_full = df_lookup_full.count()
N_df_lookup_null = N_df_lookup_full - N_df_lookup
print(f'# rows in df_lookup: {N_df_lookup}')
print(f'# rows in df_lookup (fully enumerated): {N_df_lookup_full}')
print(f'# of rows without data: {N_df_lookup_null} ({(N_df_lookup_null/N_df_lookup_full*100):.02f}%)')

# df_lookup_full.show()

#### 4.2. Assertion checks

In [None]:
# 1) make sure N_df_lookup_full is the product of the length of each key
N_enumerations = 1
for key in keys:
    N_enumerations *= df_lookup.select(key).distinct().count()
    
assert N_df_lookup_full == N_enumerations

In [None]:
# 2) number of id list is the same before and after enumeration
df_lookup_ids = df_lookup.select('id').distinct().toPandas().iloc[:, 0].values
df_lookup_full_ids = df_lookup_full.select('id').distinct().toPandas().iloc[:, 0].values

assert len(set(df_lookup_ids).difference(df_lookup_full_ids)) == 0

### 5. Save

In [None]:
def get_data_size(df, bytes_per_cell=64):
    """ approximate dataset size """
    n_rows = df.count()
    n_cols = len(df.columns)
    data_MB = n_rows*n_cols*bytes_per_cell / 1e6  # MiB
    
    return n_rows, n_cols, data_MB

def get_optimal_number_of_files(df, MB_per_file=128):
    """ calculate *optimal* number of files """
    _, _, data_MB = get_data_size(df)
    n_files = math.ceil(data_MB / MB_per_file)

    print(
        f"load_data_lookup is approximately {data_MB:.02f} MB in size, ideal to split into {n_files} file(s) at 128 MB each."
    )

    return n_files

def file_size_if_partition_by(df, key):
    n_rows, n_cols, data_MB = get_data_size(df)
    n_partitions = df.select(key).distinct().count()
    avg_MB = round(data_MB / n_partitions, 2)
    
    n_rows_largest_part = df.groupBy(key).count().orderBy('count', ascending=False).first()[1]
    n_rows_smallest_part = df.groupBy(key).count().orderBy('count', ascending=True).first()[1]
    
    largest_MB = round(data_MB / n_rows * n_rows_largest_part, 2)
    smallest_MB = round(data_MB / n_rows * n_rows_smallest_part, 2)
    
    report = (
        f'Partitioning by "{key}" will yield: \n' +
        f'  - # of partitions: {n_partitions} \n' +
        f'  - avg partition size: {avg_MB} MB \n' +
        f'  - largest partition: {largest_MB} MB \n' +
        f'  - smallest partition: {smallest_MB} MB \n'
    )
    
    print(report)
    
    output = pd.DataFrame(
        {key: [n_partitions, avg_MB, largest_MB, smallest_MB]},
        index=['n_partitions', 'avg_partition_MB', 'max_partition_MB', 'min_partition_MB']
    )
    
    return output

#### 5.1. check partitioning choices and *optimal* # of sharded files

In [None]:
df_lookup_full.printSchema()

In [None]:
df_cols = df_lookup_full.columns

partition_stats = []
for key in df_cols:
    report = file_size_if_partition_by(df_lookup_full, key)
    partition_stats.append(report)

partition_stats = pd.concat(partition_stats, axis=1)

partition_stats

In [None]:
## *optimal* # of files
n_files = get_optimal_number_of_files(df_lookup_full)

In [None]:
def save_file(df, filepath, n_files=None, repartition_by=None):
    """ 
    n_files: number of target sharded files
    repartition_by: col to repartition by
    
    Note: 
        - Not available for load_data_lookup: df.write.partitionBy().bucketBy()
        - df.coalesce(n): combine without shuffling, will not go larger than current_n_files
        - df.repartition(n): shuffle and try to evenly distribute, if n > # of unique rows, some partitions will be empty
        - df.repartition(col): shuffle and create partitions by unique record in col + 1 empty/very small partition
        - df.repartition(n, col): shufffle, number partitions = min(n, unique record in col)
    """
    
    current_n_parts = df.rdd.getNumPartitions()
    
    if n_files != None and repartition_by != None:
        df_out = df.repartition(n_files, repartition_by)
    elif n_files == None and repartition_by != None:
        df_out = df.repartition(repartition_by) 
    elif n_files != None and repartition_by == None:
        df_out = df.repartition(n_files) 
    else:
        df_out = df
    
    # for reporting out:
    if repartition_by != None:
        n_out_files = df.select(repartition_by).distinct().count() + 1
        ext = f', repartitioned by {repartition_by}'
    else:
        n_out_files = current_n_parts
        ext = ''
    n_out_files = min(n_out_files, df_out.rdd.getNumPartitions())
    print(f'Saving {current_n_parts} partitions --> {n_out_files} file(s){ext}...')
    
    df_out.write \
        .mode("overwrite") \
        .option("path", filepath)\
        .saveAsTable("load_data_lookup", format='parquet')
    

### Note:
- `write.partitionBy('col1','col2',...)`: export partitions by creating hierarchical subfolders (e.g., col1=0/col2=0/col3=.../part-0)
- `write.option("maxRecordsPerFile", n).partitionBy(col)`: use to control # of unique records (to n) per partition
- `coalesce(n).write`: combine into n partitions without shuffling, will not go larger than # of RDD files (spark default is 200)
- `repartition(n).write`: try to evenly distribute, if n > # of unique rows, some partitions will be empty
- `repartition(col).write`: create partitions by unique col field, 1 empty/very small partition will be created in addition to # of unique col records
- `repartition(n, col).write`: # files exported = min(n, # of unique fields for col)
- `repartition(n).write.partitionBy(col)`: create subfolder by unique col fields, each subfolder contains n partitions
- `write.partitionBy(col1).bucketBy(n_buckets, col2)`: distribute partitions into smaller pieces called buckets, col2 can not be the same as col1, good for reducing shuffles/exchanges when tables get joined, # of files exported = n_unique_fields_in_col1 x n_buckets x n_repartitions (if applicable)

File format: part-[partiton#]-[bucket#]...snappy.parquet


### Example:
`df_lookup_full.repartition(3).write \
    .partitionBy("sector") \
    .bucketBy(2, "subsector") \
    .mode("overwrite") \
    .option("path", lookup_file)\
    .saveAsTable("load_data_lookup", format='parquet')`
    
Outputs:

```
load_data_lookup.parquet
├── _SUCCESS
├── sector=com
│   ├── part-00000-4943b363-fbac-4665-8c76-d771c3f6cbbb_00000.c000.snappy.parquet
│   ├── part-00000-4943b363-fbac-4665-8c76-d771c3f6cbbb_00001.c000.snappy.parquet
│   ├── part-00001-4943b363-fbac-4665-8c76-d771c3f6cbbb_00000.c000.snappy.parquet
│   ├── part-00001-4943b363-fbac-4665-8c76-d771c3f6cbbb_00001.c000.snappy.parquet
│   ├── part-00002-4943b363-fbac-4665-8c76-d771c3f6cbbb_00000.c000.snappy.parquet
│   └── part-00002-4943b363-fbac-4665-8c76-d771c3f6cbbb_00001.c000.snappy.parquet
└── sector=res
    ├── part-00000-4943b363-fbac-4665-8c76-d771c3f6cbbb_00000.c000.snappy.parquet
    ├── part-00000-4943b363-fbac-4665-8c76-d771c3f6cbbb_00001.c000.snappy.parquet
    ├── part-00001-4943b363-fbac-4665-8c76-d771c3f6cbbb_00000.c000.snappy.parquet
    ├── part-00001-4943b363-fbac-4665-8c76-d771c3f6cbbb_00001.c000.snappy.parquet
    ├── part-00002-4943b363-fbac-4665-8c76-d771c3f6cbbb_00000.c000.snappy.parquet
    └── part-00002-4943b363-fbac-4665-8c76-d771c3f6cbbb_00001.c000.snappy.parquet

2 directories (controlled by `partitionBy`), 13 files
```

In [None]:
save_file(df_lookup_full, lookup_file, n_files, repartition_by=None)