In [1]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'spark://128.235.40.174:7077'),
 ('spark.driver.port', '41147'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '10G'),
 ('spark.driver.extraClassPath', '/home/hao/pixiedust/data/libs/*'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.jars', 'file:/home/hao/pixiedust/bin/cloudant-spark-v2.0.0-185.jar'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '128.235.40.174'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'app-20171130102519-0008'),
 ('spark.submit.deployMode', 'client'),
 ('spark.executor.memory', '4G')]

In [16]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

import time

start_time = time.time()

schema = StructType([
    StructField("ONT_NAME", StringType()),
    StructField("CLASS_IRI", StringType()),
    StructField("PARENT_CLASS_IRI", StringType())
])

df_class_hier= spark.read \
    .schema(schema) \
    .option("header", "true") \
    .option("mode", "DROPMALFORMED") \
    .csv("hdfs://128.235.40.174:9000/ONT_CLASS_HIERARCHY.csv")
    
    
print(df_class_hier.count())
df_class_hier.printSchema()
df_class_hier=df_class_hier.distinct()
# df_class_hier.createGlobalTempView("class_hier")
df_class_hier.createOrReplaceTempView("class_hier")


schema = StructType([
    StructField("ONT_NAME", StringType()),
    StructField("CLASS_IRI", StringType()),
    StructField("CLASS_LABEL", StringType())
])

df_class_labels= spark.read \
    .schema(schema) \
    .option("header", "true") \
    .option("mode", "DROPMALFORMED") \
    .csv("hdfs://128.235.40.174:9000/ONT_CLASS_LABELS.csv")

    
print(df_class_labels.count())
df_class_labels.printSchema()
df_class_labels=df_class_labels.distinct()
# df_class_labels.createGlobalTempView("class_labels")
df_class_labels.createOrReplaceTempView("class_labels")

schema = StructType([
    StructField("ONT_NAME", StringType()),
    StructField("TAX_TYPE", StringType()),
    StructField("AREA_ID", StringType()),
    StructField("AREA_NAME", StringType()),
    StructField("AREA_LEVEL", IntegerType())
])

df_tax_areas= spark.read \
    .schema(schema) \
    .option("header", "true") \
    .option("mode", "DROPMALFORMED") \
    .csv("hdfs://128.235.40.174:9000/TAX_AREAS.csv")

print(df_tax_areas.count())
df_tax_areas.printSchema()
df_tax_areas=df_tax_areas.distinct()
# df_tax_areas.createGlobalTempView("tax_areas")
df_tax_areas.createOrReplaceTempView("tax_areas")


schema = StructType([
    StructField("ONT_NAME", StringType()),
    StructField("TAX_TYPE", StringType()),
    StructField("AREA_ID", StringType()),
    StructField("CLASS_IRI", StringType())
])

df_tax_areas_concepts= spark.read \
    .schema(schema) \
    .option("header", "true") \
    .option("mode", "DROPMALFORMED") \
    .csv("hdfs://128.235.40.174:9000/TAX_AREAS_CONCEPTS.csv")

print(df_tax_areas_concepts.count())
df_tax_areas_concepts.printSchema()
df_tax_areas_concepts=df_tax_areas_concepts.distinct()
# df_tax_areas_concepts.createGlobalTempView("tax_areas_concepts")
df_tax_areas_concepts.createOrReplaceTempView("tax_areas_concepts")



schema = StructType([
    StructField("ONT_NAME", StringType()),
    StructField("TAX_TYPE", StringType()),
    StructField("PAREA_ROOT_IRI", StringType()),
    StructField("CLASS_IRI", StringType())
])

df_tax_areas_pareas= spark.read \
    .schema(schema) \
    .option("header", "true") \
    .option("mode", "DROPMALFORMED") \
    .csv("hdfs://128.235.40.174:9000/TAX_AREAS_PAREAS.csv")

print(df_tax_areas_pareas.count())
df_tax_areas_pareas.printSchema()
df_tax_areas_pareas=df_tax_areas_pareas.distinct()
# df_tax_areas_pareas.createGlobalTempView("tax_areas_pareas")
df_tax_areas_pareas.createOrReplaceTempView("tax_areas_pareas")


print(time.time() - start_time, "seconds")

# df_class_labels
# df_class_hier
# df_tax_areas
# df_tax_areas_concepts
# df_tax_areas_pareas

# return unioned df_union

df_hier = df_class_hier.rdd.map(lambda x : (x[1], x[2], x[0])).distinct().toDF(['src', 'dst', 'ont'])

# find which ont it belongs to 

# find which area it belongs to
result = df_class_labels.join(df_tax_areas_concepts, ['CLASS_IRI', 'ONT_NAME'])
# result.show(20 ,False)
result.printSchema()

# find which area level it belongs to
result = result.join(df_tax_areas, ['ONT_NAME', 'TAX_TYPE', 'AREA_ID'])
# result.show(20 ,False)
result.printSchema()


# find which parea it belongs to 
result = result.join(df_tax_areas_pareas, ['CLASS_IRI', 'ONT_NAME', 'TAX_TYPE'])
# result.show(20 ,False)
result.printSchema()



df_union = result.rdd.map(lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7])).distinct().toDF(['id', 'ont','type', 'area_id','label', 'area_name', 'area_level', 'parea_root_id'])


print(time.time() - start_time, "seconds")

809619
root
 |-- ONT_NAME: string (nullable = true)
 |-- CLASS_IRI: string (nullable = true)
 |-- PARENT_CLASS_IRI: string (nullable = true)

711444
root
 |-- ONT_NAME: string (nullable = true)
 |-- CLASS_IRI: string (nullable = true)
 |-- CLASS_LABEL: string (nullable = true)

1258
root
 |-- ONT_NAME: string (nullable = true)
 |-- TAX_TYPE: string (nullable = true)
 |-- AREA_ID: string (nullable = true)
 |-- AREA_NAME: string (nullable = true)
 |-- AREA_LEVEL: integer (nullable = true)

343737
root
 |-- ONT_NAME: string (nullable = true)
 |-- TAX_TYPE: string (nullable = true)
 |-- AREA_ID: string (nullable = true)
 |-- CLASS_IRI: string (nullable = true)

381471
root
 |-- ONT_NAME: string (nullable = true)
 |-- TAX_TYPE: string (nullable = true)
 |-- PAREA_ROOT_IRI: string (nullable = true)
 |-- CLASS_IRI: string (nullable = true)

2.7048428058624268 seconds
root
 |-- CLASS_IRI: string (nullable = true)
 |-- ONT_NAME: string (nullable = true)
 |-- CLASS_LABEL: string (nullable = true