In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1552774118123_0009,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# catalog: database and table names
db_name = "glue"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"

In [3]:
# output s3 and temp directories
output_history_dir = "s3://pw-radek-glue-output/output-dir/legislator_history"
output_lg_single_dir = "s3://pw-radek-glue-output/output-dir/legislator_single"
output_lg_partitioned_dir = "s3://pw-radek-glue-output/output-dir/legislator_part"
redshift_temp_dir = "s3://pw-radek-glue-output/temp-dir/"

In [4]:
# Create dynamic frames from the source tables 
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)

In [5]:
print "Count: ", persons.count()
persons.printSchema()

Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string

In [6]:
print "Count: ", memberships.count()
memberships.printSchema()

Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string

In [7]:
print "Count: ", orgs.count()
orgs.printSchema()

Count:  13
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string

In [8]:
# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers']) \
    .rename_field('id', 'org_id') \
    .rename_field('name', 'org_name')

In [9]:
orgs.printSchema()

root
|-- classification: string
|-- org_id: string
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string

In [10]:
orgs.toDF().show()

+--------------+--------------------+--------------------+--------------------+--------------------+-----+-----------+
|classification|              org_id|            org_name|               links|               image|seats|       type|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+-----------+
|         party|            party/al|                  AL|                null|                null| null|       null|
|         party|      party/democrat|            Democrat|[[website,http://...|https://upload.wi...| null|       null|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...|                null| null|       null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|                null|  435|lower house|
|         party|   party/independent|         Independent|                null|                null| null|       null|
|         party|party/new_progres...|     New Pr

In [11]:
memberships.select_fields(['organization_id']).toDF().distinct().show()

+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

In [12]:
# Join the frames to create history
l_history = Join.apply(orgs, 
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 
                       'organization_id') \
                .drop_fields(['person_id', 'org_id'])

In [13]:
print "Count: ", l_history.count()
l_history.printSchema()

Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- start_date: string
|-- family_name: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- end_date: string

In [14]:
# Write out the dynamic frame into parquet in "legislator_history" directory
print "Writing to /legislator_history ..."
glueContext.write_dynamic_frame.from_options(
    frame = l_history, 
    connection_type = "s3", 
    connection_options = {"path": output_history_dir}, 
    format = "parquet")

Writing to /legislator_history ...
<awsglue.dynamicframe.DynamicFrame object at 0x7f87e0096990>

In [15]:
# Write out a single file to directory "legislator_single"
s_history = l_history.toDF().repartition(1)
print "Writing to /legislator_single ..."
s_history.write.parquet(output_lg_single_dir)

Writing to /legislator_single ...

In [16]:
# Convert to data frame, write to directory "legislator_part", partitioned by (separate) Senate and House.
print "Writing to /legislator_part, partitioned by Senate and House ..."
l_history.toDF().write.parquet(output_lg_partitioned_dir, partitionBy=['org_name'])

Writing to /legislator_part, partitioned by Senate and House ...