# AWS S3 CONNECT VIA PYSPARK-HIVE

In [49]:
from pyspark.sql import SparkSession, HiveContext
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
spark = SparkSession \
    .builder \
    .appName("s3 bucket connect") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "10g") \
    .config("spark.executor.instances","10")\
    .config("spark.jars","/home/mouli/hadoop-2.9.1/share/hadoop/tools/lib/aws-java-sdk-bundle-1.11.199.jar,/home/mouli/hadoop-2.9.1/share/hadoop/tools/lib/hadoop-aws-2.9.1.jar") \
    .config("hive.metastore.uris", "thrift://localhost:9083")\
    .enableHiveSupport()\
    .getOrCreate()

In [3]:
buketName="awsbucket25/employee.json"

AWS SECURITY CREDENTIALS TO ACCESS THE S3 BUCKET IN PROGRAMMATIC WAY

In [4]:
AWS_ACCESS_KEY_ID='AXXXXXXXXXXXXXXXXXXT'
AWS_SECRET_ACCESS_KEY='GXXXXYYYYXYXYXYXYXYXYXYYXSHHXBXXUXXXX'

In [5]:
sc= spark.sparkContext

PARAMETERS TO SET HADOOP CONFIGURATION VALUES FROM PYSPARK

In [6]:
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY)

READING THE EMPLOYEE JSON DATASET FROM S3

In [7]:
inputFile= sc.textFile("s3n://{}".format(buketName))

In [8]:
data = spark.read.json(inputFile)

In [9]:
data.printSchema()

root
 |-- Employees: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- emailAddress: string (nullable = true)
 |    |    |-- employeeCode: string (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- jobTitleName: string (nullable = true)
 |    |    |-- lastName: string (nullable = true)
 |    |    |-- phoneNumber: string (nullable = true)
 |    |    |-- preferredFullName: string (nullable = true)
 |    |    |-- region: string (nullable = true)
 |    |    |-- userId: string (nullable = true)



In [10]:
data.show()

+--------------------+
|           Employees|
+--------------------+
|[[romin.k.irani@g...|
+--------------------+



In [11]:
data.columns

['Employees']

In [36]:
field_info=[]
field_info.append(1)
def function_to_flatten_json(input_df):
    flat_columns = [c[0] for c in input_df.dtypes if c[1][:6] != 'struct' and c[1][:5] !="array"]
    print("flat_columns : ",flat_columns )
    nested_columns = [c[0] for c in input_df.dtypes if c[1][:6] == 'struct']
    array_columns=[c[0] for c in input_df.dtypes if c[1][:5] == 'array']
    print("array_columns",array_columns)
    print("nested_columns :",nested_columns)
    for ac in array_columns:
        input_df=input_df.select(flat_columns+nested_columns+[x for x in array_columns if x <> ac ]+[F.explode(ac).alias(ac)])
        print(input_df.columns)
    input_df = input_df.select(flat_columns +array_columns+
                               [F.col(nested_column+'.'+c).alias(nested_column+'_'+c)
                                for nested_column in nested_columns
                                for c in input_df.select(nested_column+'.*').columns])
    field_info.pop()
    field_info.append(input_df)
    print("input_dataframe ",",".join([x[1][:6] for x in input_df.dtypes]))
    new_nested=[c[0] for c in input_df.dtypes if c[1][:6] == 'struct']
    new_array=[c[0] for c in input_df.dtypes if c[1][:5] == 'array']
    if new_nested or new_array :        
        function_to_flatten_json(input_df)
function_to_flatten_json(data)
field_info_df=field_info[0]

('flat_columns : ', [])
('array_columns', ['Employees'])
('nested_columns :', [])
['Employees']
('input_df ', 'struct')
('flat_columns : ', [])
('array_columns', [])
('nested_columns :', ['Employees'])
('input_df ', 'string,string,string,string,string,string,string,string,string')


In [37]:
field_info_df_pandas=field_info_df.toPandas()

In [38]:
field_info_df_pandas.head(5).T

Unnamed: 0,0,1,2
Employees_emailAddress,romin.k.irani@gmail.com,neilrirani@gmail.com,tomhanks@gmail.com
Employees_employeeCode,E1,E2,E3
Employees_firstName,Romin,Neil,Tom
Employees_jobTitleName,Developer,Developer,Program Directory
Employees_lastName,Irani,Irani,Hanks
Employees_phoneNumber,408-1234567,408-1111111,408-2222222
Employees_preferredFullName,Romin Irani,Neil Irani,Tom Hanks
Employees_region,CA,CA,CA
Employees_userId,rirani,nirani,thanks


SAVING THE JSON DATA PROCESSED FROM S3 TO HIVE TABLES

In [39]:
field_info_df.write.mode("overwrite").format("orc").saveAsTable("dev_usr.employee_information")

In [40]:
show_db = spark.sql('show databases')
show_db.show()

+------------+
|databaseName|
+------------+
|     default|
|     dev_usr|
+------------+



In [41]:
use_db=spark.sql('use dev_usr')
show_tables=spark.sql('show tables')
show_tables.show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| dev_usr|employee_information|      false|
+--------+--------------------+-----------+



In [42]:
show_info=spark.sql("select * from dev_usr.employee_information")

In [43]:
show_info.show()

+----------------------+----------------------+-------------------+----------------------+------------------+---------------------+---------------------------+----------------+----------------+
|Employees_emailAddress|Employees_employeeCode|Employees_firstName|Employees_jobTitleName|Employees_lastName|Employees_phoneNumber|Employees_preferredFullName|Employees_region|Employees_userId|
+----------------------+----------------------+-------------------+----------------------+------------------+---------------------+---------------------------+----------------+----------------+
|  romin.k.irani@gma...|                    E1|              Romin|             Developer|             Irani|          408-1234567|                Romin Irani|              CA|          rirani|
|  neilrirani@gmail.com|                    E2|               Neil|             Developer|             Irani|          408-1111111|                 Neil Irani|              CA|          nirani|
|    tomhanks@gmail.com|      

READING THE PHARMACY JSON DATASET FROM S3

In [44]:
bucket_name="awsbucket25/pharmacy-medication.json"
input_schema= sc.textFile("s3n://{}".format(bucket_name))

In [45]:
pharm_data = spark.read.json(input_schema)

In [46]:
pharm_data.printSchema()

root
 |-- careTeam: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- provider: struct (nullable = true)
 |    |    |    |-- reference: string (nullable = true)
 |    |    |-- sequence: long (nullable = true)
 |-- created: string (nullable = true)
 |-- diagnosis: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- diagnosisCodeableConcept: struct (nullable = true)
 |    |    |    |-- coding: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |-- sequence: long (nullable = true)
 |-- id: string (nullable = true)
 |-- identifier: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- system: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- insurance: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- coverage: struct

In [47]:
field_info=[]
field_info.append(1)
def function_to_flatten_json(input_df):
    flat_columns = [c[0] for c in input_df.dtypes if c[1][:6] != 'struct' and c[1][:5] !="array"]
    print("flat_columns : ",flat_columns )
    nested_columns = [c[0] for c in input_df.dtypes if c[1][:6] == 'struct']
    array_columns=[c[0] for c in input_df.dtypes if c[1][:5] == 'array']
    print("array_columns",array_columns)
    print("nested_columns :",nested_columns)
    for ac in array_columns:
        input_df=input_df.select(flat_columns+nested_columns+[x for x in array_columns if x <> ac ]+[F.explode(ac).alias(ac)])
        print(input_df.columns)
    input_df = input_df.select(flat_columns +array_columns+
                               [F.col(nested_column+'.'+c).alias(nested_column+'_'+c)
                                for nested_column in nested_columns
                                for c in input_df.select(nested_column+'.*').columns])
    field_info.pop()
    field_info.append(input_df)
    print("input_dataframe ",",".join([x[1][:6] for x in input_df.dtypes]))
    new_nested=[c[0] for c in input_df.dtypes if c[1][:6] == 'struct']
    new_array=[c[0] for c in input_df.dtypes if c[1][:5] == 'array']
    if new_nested or new_array :        
        function_to_flatten_json(input_df)
function_to_flatten_json(pharm_data)
field_info_df=field_info[0]

('flat_columns : ', ['created', 'id', 'resourceType', 'status', 'use'])
('array_columns', ['careTeam', 'diagnosis', 'identifier', 'insurance', 'item', 'supportingInfo'])
('nested_columns :', ['insurer', 'originalPrescription', 'patient', 'payee', 'prescription', 'priority', 'provider', 'text', 'total', 'type'])
['created', 'id', 'resourceType', 'status', 'use', 'insurer', 'originalPrescription', 'patient', 'payee', 'prescription', 'priority', 'provider', 'text', 'total', 'type', 'diagnosis', 'identifier', 'insurance', 'item', 'supportingInfo', 'careTeam']
['created', 'id', 'resourceType', 'status', 'use', 'insurer', 'originalPrescription', 'patient', 'payee', 'prescription', 'priority', 'provider', 'text', 'total', 'type', 'careTeam', 'identifier', 'insurance', 'item', 'supportingInfo', 'diagnosis']
['created', 'id', 'resourceType', 'status', 'use', 'insurer', 'originalPrescription', 'patient', 'payee', 'prescription', 'priority', 'provider', 'text', 'total', 'type', 'careTeam', 'diagn

['created', 'id', 'resourceType', 'status', 'use', 'insurer_reference', 'originalPrescription_reference', 'patient_reference', 'prescription_reference', 'provider_reference', 'text_div', 'text_status', 'total_currency', 'total_value', 'careTeam_sequence', 'diagnosis_sequence', 'identifier_system', 'identifier_value', 'insurance_focal', 'insurance_sequence', 'item_sequence', 'item_servicedDate', 'supportingInfo_sequence', 'item_careTeamSequence', 'item_informationSequence', 'priority_coding_code', 'type_coding_code', 'type_coding_system', 'careTeam_provider_reference', 'insurance_coverage_reference', 'item_net_currency', 'item_net_value', 'item_quantity_code', 'item_quantity_system', 'item_quantity_unit', 'item_quantity_value', 'supportingInfo_valueQuantity_value', 'item_detail', 'payee_type_coding', 'item_productOrService_coding', 'supportingInfo_category_coding', 'supportingInfo_code_coding', 'diagnosis_diagnosisCodeableConcept_coding']
['created', 'id', 'resourceType', 'status', 'use

('input_df ', 'string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,bigint,bigint,string,string,boolea,bigint,bigint,string,bigint,bigint,bigint,string,string,string,string,string,string,bigint,string,string,string,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,bigint,string,string')


In [51]:
field_info_df_pandas=field_info_df.toPandas()
field_info_df_pandas.head(5).T

Unnamed: 0,0,1,2,3,4
created,2014-08-16,2014-08-16,2014-08-16,2014-08-16,2014-08-16
id,760151,760151,760151,760151,760151
resourceType,Claim,Claim,Claim,Claim,Claim
status,active,active,active,active,active
use,claim,claim,claim,claim,claim
insurer_reference,Organization/2,Organization/2,Organization/2,Organization/2,Organization/2
originalPrescription_reference,http://pharmacy.org/MedicationRequest/AB1202B,http://pharmacy.org/MedicationRequest/AB1202B,http://pharmacy.org/MedicationRequest/AB1202B,http://pharmacy.org/MedicationRequest/AB1202B,http://pharmacy.org/MedicationRequest/AB1202B
patient_reference,Patient/1,Patient/1,Patient/1,Patient/1,Patient/1
prescription_reference,http://pharmacy.org/MedicationRequest/AB1234G,http://pharmacy.org/MedicationRequest/AB1234G,http://pharmacy.org/MedicationRequest/AB1234G,http://pharmacy.org/MedicationRequest/AB1234G,http://pharmacy.org/MedicationRequest/AB1234G
provider_reference,Organization/1,Organization/1,Organization/1,Organization/1,Organization/1


SAVING THE JSON DATA PROCESSED FROM S3 TO HIVE TABLES

In [52]:
field_info_df.write.mode("overwrite").format("orc").saveAsTable("dev_usr.pharmacy_information")

In [53]:
show_info=spark.sql("select * from dev_usr.pharmacy_information")
show_info.show()

+----------+------+------------+------+-----+-----------------+------------------------------+-----------------+----------------------+------------------+--------------------+-----------+--------------+-----------+-----------------+------------------+--------------------+----------------+---------------+------------------+-------------+-----------------+-----------------------+---------------------+------------------------+--------------------+----------------+--------------------+---------------------------+----------------------------+-----------------+--------------+------------------+--------------------+------------------+-------------------+----------------------------------+--------------------+----------------------+----------------------------------------------+---------------------------------+------------------------------------+-----------------------------------+-----------------------------------+-------------------------------+---------------------------------+----------