### Joining Tables with AWS Glue Studio

#### 1. Getting started

The dataset we'll be using is from S3, at:

    s3://imba-andy/features/

It contains data in parquet format.

Import libraries

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

#### 2. Joining

We will write a script that:

1. Set up a single `GlueContext`.
2. Creating dataframes from existing athena catelog
3. Combines up_feature_db, prd_feature_db, user_feature_db and user_feature_2_db into a single 
data set. This is often referred to as de-normalization.
4. Write the output as a single csv file to S3, at `s3://imba-andy/features-output/`

In [None]:
# create glue context first
glueContext = GlueContext(SparkContext.getOrCreate())
    
    
# creating dataframes from existing athena catelog
up_features = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-andy/features/up-feature-db/"]})
prd_features = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-andy/features/prd-feature-db/"]})
user_features_1 = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-andy/features/user-feature-db/"]})
user_features_2 = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-andy/features/user-feature-2-db/"]})

In [None]:
# join user features together
users = Join.apply(user_features_1.rename_field('user_id','user_id1'), user_features_2, 'user_id1', 'user_id').drop_fields(['user_id1'])
    
# join everything together
df = Join.apply(Join.apply(up_features, 
                      users.rename_field('user_id','user_id1'), 
                      'user_id','user_id1').drop_fields(['user_id1']),
          prd_features.rename_field('product_id','product_id1'), 
          'product_id','product_id1').drop_fields(['product_id1'])

In [None]:
# convert glue dynamic dataframe to spark dataframe
df_spark = df.toDF()
df_spark.repartition(1).write.mode('overwrite').format('csv').save("s3://imba-andy/features-output", header = 'true')