# Imports

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

In [2]:
sc = pyspark.SparkContext(appName="_file_transforms")
spark = SparkSession(sc)

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from skainet_spark import Pipeline, ValidatedPipeline, transform, Input, Output, Metadata

# DDL

## 6.2. files

In [4]:
@transform(spark,
    metadata_extracting_disabled = Metadata('/_file_transforms/raw_file_transforms_metadata.csv', spark),
    metadata_ddl_extracting_disabled = Metadata('/_file_transforms/ddl_files_metadata.csv', spark),
           
    ddl_csv_extracting_disabled = Output('/_file_transforms/files_ddl.csv')
)
def my_function(spark, metadata_extracting_disabled, metadata_ddl_extracting_disabled, ddl_csv_extracting_disabled):
    
    schema = metadata_extracting_disabled()
    df = (spark.read
               .format("csv")
               .option("header", "true")
               .schema(schema)
               .load("./data/_file_transforms/file_transforms.csv"))
    
    
    pipe = Pipeline(df)
    pipe = (pipe
            .where((F.col('extension') == F.lit('parquet')) | \
                   (F.col('transform_type') == F.lit('Metadata')) | \
                   (F.col('stage') == F.lit('ddl'))
                   #((F.col('stage') == F.lit('exception')) & (F.col('project').isin(['clubs', 'matches_history'])))
                  )
            .add_columns({
                'type': F.when(F.col('transform_type') == F.lit('Metadata'), F.lit('Schema'))
                         .otherwise(
                             F.when(F.col('stage') == F.lit('exception'), F.lit('Exceptions'))
                              .otherwise(F.lit('Dataset'))
                         )
            })
            .rename_columns({
                'path': 'file_id'
            })
            .select([
                'file_id',
                'type',
                'project',
                'stage',
                'modified'
            ])
            .cast_columns({
                'modified': 'datetime'
            })
            .transform(lambda df: df.withColumn('modified', F.date_format(F.col('modified'), 'yyyy-MM-dd HH:mm:ss')))
            .distinct()
            .transform(deduplicate)
            .show_dimensions()
           )
    
    validated_pipe = ValidatedPipeline(pipe, metadata_ddl_extracting_disabled)
    validated_pipe = (validated_pipe
                      .validate()
                     )
    
    validated_pipe.write_csv(ddl_csv_extracting_disabled)


def deduplicate(df):
    df = df.withColumn('rank', F.rank().over(Window.partitionBy('file_id').orderBy(F.desc('type'))))
    df = df.where(F.col('rank')==1)
    df = df.drop('rank')
    return df

In [5]:
my_function(spark)

cols: 5 rows: 100
Validated count: 100
Exception count: 0


## 6.3. transforms

In [6]:
@transform(spark,
    metadata_extracting_disabled = Metadata('/_file_transforms/raw_file_transforms_metadata.csv', spark),
    metadata_ddl_extracting_disabled = Metadata('/_file_transforms/ddl_transforms_metadata.csv', spark),
           
    ddl_csv_extracting_disabled = Output('/_file_transforms/transforms_ddl.csv')
)
def my_function(spark, metadata_extracting_disabled, metadata_ddl_extracting_disabled, ddl_csv_extracting_disabled):
    
    schema = metadata_extracting_disabled()
    df = (spark.read
               .format("csv")
               .option("header", "true")
               .schema(schema)
               .load("./data/_file_transforms/file_transforms.csv"))
    
    
    pipe = Pipeline(df)
    pipe = (pipe
            .rename_columns({
                'transform_name': 'transform_id'
            })
            .select([
                'transform_id'
            ])
            .distinct()
            .show_dimensions()
           )
    
    validated_pipe = ValidatedPipeline(pipe, metadata_ddl_extracting_disabled)
    validated_pipe = (validated_pipe
                      .validate()
                     )
    
    validated_pipe.write_csv(ddl_csv_extracting_disabled)


In [7]:
my_function(spark)

cols: 1 rows: 23
Validated count: 23
Exception count: 0


## 6.4. file_TRANSFORM_transform (INPUT, OUTPUT, METADATA)

### INPUT

In [8]:
@transform(spark,
    metadata_extracting_disabled = Metadata('/_file_transforms/raw_file_transforms_metadata.csv', spark),
    metadata_ddl_extracting_disabled = Metadata('/_file_transforms/ddl_file_TRANSFORM_transform_metadata.csv', spark),
           
    ddl_csv_extracting_disabled = Output('/_file_transforms/file_INPUT_transform_ddl.csv')
)
def my_function(spark, metadata_extracting_disabled, metadata_ddl_extracting_disabled, ddl_csv_extracting_disabled):
    
    schema = metadata_extracting_disabled()

    df = (spark.read
           .format("csv")
           .option("header", "true")
           .schema(schema)
           .load("./data/_file_transforms/file_transforms.csv"))
    
    pipe = Pipeline(df)
    pipe = (pipe
            .where(F.col('transform_type')==F.lit('Input'))
            
            .where((F.col('extension') == F.lit('parquet')) | \
                   (F.col('transform_type') == F.lit('Metadata')) | \
                   (F.col('stage') == F.lit('ddl'))
                  )
            .rename_columns({
                'path': 'file_id'
            })

            .rename_columns({
                'transform_name': 'transform_id'
            })
            .select([
                'file_id',
                'transform_id'
            ])
            .distinct()
            .show_dimensions()
           )
    
    validated_pipe = ValidatedPipeline(pipe, metadata_ddl_extracting_disabled)
    validated_pipe = (validated_pipe
                      .validate()
                     )

    validated_pipe.write_csv(ddl_csv_extracting_disabled)

In [9]:
my_function(spark)

cols: 2 rows: 24
Validated count: 24
Exception count: 0


### OUTPUT

In [10]:
@transform(spark,
    metadata_extracting_disabled = Metadata('/_file_transforms/raw_file_transforms_metadata.csv', spark),
    metadata_ddl_extracting_disabled = Metadata('/_file_transforms/ddl_file_TRANSFORM_transform_metadata.csv', spark),
           
    ddl_csv_extracting_disabled = Output('/_file_transforms/file_OUTPUT_transform_ddl.csv')
)
def my_function(spark, metadata_extracting_disabled, metadata_ddl_extracting_disabled, ddl_csv_extracting_disabled):
    
    schema = metadata_extracting_disabled()

    df = (spark.read
           .format("csv")
           .option("header", "true")
           .schema(schema)
           .load("./data/_file_transforms/file_transforms.csv"))
    
    pipe = Pipeline(df)
    pipe = (pipe
            .where(F.col('transform_type')==F.lit('Output'))
            
            .where((F.col('extension') == F.lit('parquet')) | \
                   (F.col('transform_type') == F.lit('Metadata')) | \
                   ((F.col('stage') == F.lit('ddl')) | (F.col('stage') == F.lit('exception')))
                  )
            .rename_columns({
                'path': 'file_id'
            })

            .rename_columns({
                'transform_name': 'transform_id'
            })
            .select([
                'transform_id',
                'file_id'
            ])
            .distinct()
            .show_dimensions()
           )
    
    validated_pipe = ValidatedPipeline(pipe, metadata_ddl_extracting_disabled)
    validated_pipe = (validated_pipe
                      .validate()
                     )

    validated_pipe.write_csv(ddl_csv_extracting_disabled)

In [11]:
my_function(spark)

cols: 2 rows: 81
Validated count: 81
Exception count: 0


### METADATA

In [12]:
@transform(spark,
    metadata_extracting_disabled = Metadata('/_file_transforms/raw_file_transforms_metadata.csv', spark),
    metadata_ddl_extracting_disabled = Metadata('/_file_transforms/ddl_file_TRANSFORM_transform_metadata.csv', spark),
           
    ddl_csv_extracting_disabled = Output('/_file_transforms/file_METADATA_transform_ddl.csv')
)
def my_function(spark, metadata_extracting_disabled, metadata_ddl_extracting_disabled, ddl_csv_extracting_disabled):
    
    schema = metadata_extracting_disabled()

    df = (spark.read
           .format("csv")
           .option("header", "true")
           .schema(schema)
           .load("./data/_file_transforms/file_transforms.csv"))
    
    pipe = Pipeline(df)
    pipe = (pipe
            .where(F.col('transform_type')==F.lit('Metadata'))
            
            .where((F.col('extension') == F.lit('parquet')) | \
                   (F.col('transform_type') == F.lit('Metadata')) | \
                   (F.col('stage') == F.lit('ddl'))
                  )
            .rename_columns({
                'path': 'file_id'
            })

            .rename_columns({
                'transform_name': 'transform_id'
            })
            .select([
                'file_id',
                'transform_id'
            ])
            .distinct()
            .show_dimensions()
           )
    
    validated_pipe = ValidatedPipeline(pipe, metadata_ddl_extracting_disabled)
    validated_pipe = (validated_pipe
                      .validate()
                     )

    validated_pipe.write_csv(ddl_csv_extracting_disabled)

In [13]:
my_function(spark)

cols: 2 rows: 41
Validated count: 41
Exception count: 0


## 6.5. apps

In [14]:
@transform(spark,
    metadata_extracting_disabled = Metadata('/_file_transforms/raw_file_transforms_metadata.csv', spark),
    metadata_ddl_extracting_disabled = Metadata('/_file_transforms/ddl_apps_metadata.csv', spark),
           
    ddl_csv_extracting_disabled = Output('/_file_transforms/apps_ddl.csv')
)
def my_function(spark, metadata_extracting_disabled, metadata_ddl_extracting_disabled, ddl_csv_extracting_disabled):
    
    schema = metadata_extracting_disabled()
    df = (spark.read
               .format("csv")
               .option("header", "true")
               .schema(schema)
               .load("./data/_file_transforms/file_transforms.csv"))
    
    
    pipe = Pipeline(df)
    pipe = (pipe
            .select([
                'app_id',
                'app_name'
            ])
            .distinct()
            .show_dimensions()
           )
    
    validated_pipe = ValidatedPipeline(pipe, metadata_ddl_extracting_disabled)
    validated_pipe = (validated_pipe
                      .validate()
                     )
    
    validated_pipe.write_csv(ddl_csv_extracting_disabled)


In [15]:
my_function(spark)

cols: 2 rows: 11
Validated count: 11
Exception count: 0


## 6.6. transform_STAGE_OF_app

In [16]:
@transform(spark,
    metadata_extracting_disabled = Metadata('/_file_transforms/raw_file_transforms_metadata.csv', spark),
    metadata_ddl_extracting_disabled = Metadata('/_file_transforms/ddl_transform_STAGE_OF_app_metadata.csv', spark),
           
    ddl_csv_extracting_disabled = Output('/_file_transforms/transform_STAGE_OF_app_ddl.csv')
)
def my_function(spark, metadata_extracting_disabled, metadata_ddl_extracting_disabled, ddl_csv_extracting_disabled):
    
    schema = metadata_extracting_disabled()
    df = (spark.read
               .format("csv")
               .option("header", "true")
               .schema(schema)
               .load("./data/_file_transforms/file_transforms.csv"))
    
    
    pipe = Pipeline(df)
    pipe = (pipe
            .where(F.col('stage') != F.lit('exception'))
            .rename_columns({
                'transform_name': 'transform_id'
            })
            .select([
                'transform_id',
                'app_id'
            ])
            .distinct()
            .show_dimensions()
           )
    
    validated_pipe = ValidatedPipeline(pipe, metadata_ddl_extracting_disabled)
    validated_pipe = (validated_pipe
                      .validate()
                     )
    
    validated_pipe.write_csv(ddl_csv_extracting_disabled)


In [17]:
my_function(spark)

cols: 2 rows: 23
Validated count: 23
Exception count: 0
