In [1]:
from pyspark.sql import SparkSession, Row
import __credential__
from os import environ

environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/postgresql-42.2.2.jar pyspark-shell' 
environ['PYSPARK_PYTHON']='/home/ubuntu/anaconda3/bin/python'
environ['PYSPARK_DRIVER_PYTHON']='/home/ubuntu/anaconda3/bin/jupyter'

In [2]:
spark = SparkSession \
    .builder \
    .appName("meta_info") \
    .getOrCreate()
    #.master(__credential__.spark_host) \

### Read meta data file

In [8]:
meta = spark.read.json('s3a://gdcdata/refs/files.c+r.json', multiLine=True)
meta.printSchema()
meta.createOrReplaceTempView("meta_view")

root
 |-- access: string (nullable = true)
 |-- annotations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotation_id: string (nullable = true)
 |-- cases: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- case_id: string (nullable = true)
 |    |    |-- project: struct (nullable = true)
 |    |    |    |-- project_id: string (nullable = true)
 |-- data_category: string (nullable = true)
 |-- data_format: string (nullable = true)
 |-- file_name: string (nullable = true)
 |-- file_size: long (nullable = true)



### Read manifest data file

In [92]:
manifest = spark.read.format("csv")\
    .option("delimiter","\t").option("quote","")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load('s3a://gdcdata/refs/gdc_manifest.c+r.txt')
manifest = manifest.selectExpr('id as path', 'filename as filename')
manifest.printSchema()
manifest.createOrReplaceTempView("manifest_view")

root
 |-- path: string (nullable = true)
 |-- filename: string (nullable = true)



### Join information from meta files and manifest files

In [27]:
index = spark.sql('''
        SELECT manifest_view.id, manifest_view.filename, meta_view.data_format, \
        meta_view.cases.project.project_id, meta_view.cases.case_id 
        FROM manifest_view
        INNER JOIN meta_view ON  manifest_view.filename=meta_view.file_name
        ''')
index.createOrReplaceTempView("index_view")
index.show(5)

+--------------------+--------------------+-----------+-----------+--------------------+
|                  id|            filename|data_format| project_id|             case_id|
+--------------------+--------------------+-----------+-----------+--------------------+
|9669a175-8199-4ef...|nationwidechildre...|    BCR XML|[TCGA-BRCA]|[8240c4ae-f878-48...|
|60aec852-6075-446...|148d950b-4202-4f3...|        TXT|[TCGA-BRCA]|[f3cb557d-23e4-4f...|
|8dc57eac-45aa-4af...|a2233404-f380-4bd...|        TXT|[TCGA-BRCA]|[ee5744c0-a8dc-43...|
|a9795a96-c066-467...|b2a6c9e3-65eb-43b...|        TXT|[TCGA-BRCA]|[c0b7b798-3383-4a...|
|7a02938c-644c-487...|99c7b545-c90e-407...|        TXT|[TCGA-BRCA]|[d8492ebd-3d94-4a...|
+--------------------+--------------------+-----------+-----------+--------------------+
only showing top 5 rows



### Split files and save to PostgreSQL

In [72]:
def psql_saver(df, tbname, savemode='error'):
    df.createOrReplaceTempView("view")
    spark.sql('''SELECT * FROM view''').write \
    .format('jdbc') \
    .option('url', 'jdbc:postgresql://%s' % __credential__.jdbc_accessible_host_psql) \
    .option('dbtable', tbname) \
    .option('user', __credential__.user_psql) \
    .option('password', __credential__.password_psql) \
    .mode(savemode) \
    .save()

In [88]:
TableByFormat = {'BCR XML': 'xml_list', 'TXT': 'txt_list'}
files_groupby_types = list(map(lambda key : { \
            'type' : key, \
            'flist' : index.filter(index.data_format == key)}, \
                               TableByFormat))
for files in files_groupby_types:
    print("Saving [%s] data to PostgreSQL table [%s]..." \
          % (files['type'], TableByFormat[files['type']]))
    psql_saver(files['flist'], TableByFormat[files['type']], 'overwrite')

Saving [BCR XML] data to PostgreSQL table [xml_list]...
Saving [TXT] data to PostgreSQL table [txt_list]...


### Unreadable files

In [73]:
unreadable = index.rdd.filter(lambda x: x.data_format not in TableByFormat)
if unreadable.count():
    print("Saving data in unkown foramt to PostgreSQL table: unknowns.")
    psql_saver(unreadable.toDF(), 'unknowns', 'overwrite')

### Playground

In [None]:
TableByFormat = {'BCR ML': 'xml_list', 'TXT': 'txt_list'}
diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))
file_unreadable = reduce((lambda key : index.filter(index.data_format != key)), TableByFormat)
file_unreadable
#file_unreadable.count()

In [38]:
t = info.groupBy("data_format").agg({"*": "count"})
t.show(5)

+-----------+--------+
|data_format|count(1)|
+-----------+--------+
|    BCR XML|    1097|
|        TXT|    3666|
+-----------+--------+



In [20]:
spark.stop()