<a href="https://colab.research.google.com/github/apfurlan/delta_to_athena/blob/main/write_delta_on_athena.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Write Delta Tables on Athena**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from boto.s3.connection import S3Connection
import time, boto3
from delta import *
import os

## **Configure Spark**

In [None]:
builder = (
    SparkSession.builder.appName("MyApp")
    .config("spark.jars.packages", ["io.delta:delta-core_2.12:1.0.0","org.apache.hadoop:hadoop-aws:3.2.0"])
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3a.fast.upload", True)
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## **Generate Manifest to connect to Athena**

In [None]:
athena = boto3.client(
    "athena",
    region_name="us-east-1",
)

In [None]:
def build_hive_ddl(
        table_name, object_schema, symlink_location, partition_cols=[], verbose=False):
    """
    :param table_name: the name of the table you want to register in the Hive metastore    
    :param object_schema: an instance of pyspark.sql.Dataframe.schema
    :param location: the storage location for this data (and S3 or HDFS filepath)
    :param partition_schema: an optional instance of pyspark.sql.Dataframe.schema that stores the
    columns that are used for partitioning on disk
    :param verbose:
    :return: None
    """
    columns = (
        ','.join(
            [field.simpleString() for field in object_schema if field.name not in partition_cols]
        )
    ).replace(':', ' ')
    
    partition_schema = (
    ','.join(
        [field.simpleString() for field in object_schema if field.name in partition_cols]
        )
    ).replace(':', ' ')
    
    ddl = 'CREATE EXTERNAL TABLE '+table_name+' ('\
        + columns + ')'\
        + (
              f" PARTITIONED BY ({partition_schema}) "
              if partition_schema else ''
          )\
        + " ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' " \
        + " STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'" \
        + " OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'" \
        + f" LOCATION '{symlink_location}'"
    if verbose:
        print('Generated Hive DDL:\n'+ddl)
    return ddl

In [None]:
def run_athena_query(db_name, workgroup, query):

    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            "Database": db_name,
            "Catalog": "AwsDataCatalog",
        },
        ResultConfiguration={
            'OutputLocation': 's3://mater-dei-dl-562415927517/athena_output/'
        },
        WorkGroup=workgroup
    )

    timout = 60
    while timout > 0:
        time.sleep(0.1)
        timout -= 0.1
        result = athena.get_query_execution(
            QueryExecutionId=response["QueryExecutionId"]
        )
        if result["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
            break

    if result["QueryExecution"]["Status"]["State"] == "SUCCEEDED":
        output = athena.get_query_results(QueryExecutionId=response["QueryExecutionId"])
        return output
    else:
        print(result)
        raise "Not possible to run athena query"

In [None]:
def add_delta_2_athena(bucket_name, delta_path, db_name, workgroup, table_name, partition_cols = []):
    if len(
            run_athena_query(db_name,workgroup,f"SHOW TABLES LIKE '{table_name}';")['ResultSet']['Rows']
        ) > 0:
        print('Table already existis!')
        return
    else:
        
        delta_df = DeltaTable.forPath(
            spark, f's3a://{bucket_name}/{delta_path}'
        ).toDF()
            
        ddl = build_hive_ddl(
            f'{db_name}.{table_name}',
            delta_df.schema,
            f's3://{bucket_name}/{delta_path}/_symlink_format_manifest/',
            partition_cols
        )
        run_athena_query(db_name,workgroup,ddl)
        # Repair Table
        run_athena_query(db_name,workgroup,f"MSCK REPAIR TABLE {table_name};")
        print('SUCCESS !!!!')

In [None]:
bucket_name = 'bucket_name'
dbname = 'dbname_on_glue'
workgroup = 'primary'

In [None]:
full_load_path = bucket_name+'/path'

table_path = 'path_on_s3/folder_name'
table_name = 'folder_name'

stagingData = DeltaTable.forPath(spark, f's3a://{bucket_name}/{table_path}')
stagingData.generate("symlink_format_manifest")
add_delta_2_athena(bucket_name, table_path, dbname, workgroup, table_name)

SUCCESS !!!!
