In [1]:
facilities = spark \
  .read \
  .parquet("gs://daasnerds-cracircle/aact/etl/parquet/facilities")

In [None]:
facilities \
  .select("id", "state") \
  .distinct() \
  .groupBy("state") \
  .count() \
  .collect()

In [9]:
facilities.printSchema()

root
 |-- id: string (nullable = true)
 |-- nct_id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- country: string (nullable = true)



In [12]:
facilitiesAgg = facilities \
  .groupBy("name", "city", "zip", "country") \
  .count().alias("count")

In [13]:
facilitiesAgg.printSchema()

root
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- country: string (nullable = true)
 |-- count: long (nullable = false)



## Analyze facility names for common patterns to eliminate anonymized sites

In [14]:
facilitiesRDD = facilitiesAgg.select("name").fillna('').rdd

In [15]:
facilitiesRDD.first()

Row(name=u'Meridian Clinical Research')

In [16]:
from pyspark.sql.functions import *

facilitiesWords = facilitiesRDD \
  .map(lambda row: row['name']) \
  .flatMap(lambda line: line.upper().split(" ")) \
  .map(lambda word: (word, 1)) \
  .reduceByKey(lambda a, b: a + b) \
  .toDF() \
  .filter(col('_1') != '')

In [17]:
facilitiesWords.count()

60030

In [18]:
facilitiesWords.orderBy(col("_2"), ascending=False).show(20)

+---------------+-----+
|             _1|   _2|
+---------------+-----+
|           SITE|72440|
|         CENTER|39453|
|INVESTIGATIONAL|37128|
|             OF|36384|
|       RESEARCH|30836|
|     UNIVERSITY|24752|
|        MEDICAL|24128|
|       HOSPITAL|15901|
|              -|14885|
|         CANCER|14814|
|            FOR|14688|
|  INVESTIGATIVE|13256|
|       CLINICAL|12801|
|            AND|11169|
|         HEALTH|11026|
|      INSTITUTE| 8604|
|      INGELHEIM| 8130|
|     BOEHRINGER| 8130|
|            THE| 6591|
|     ASSOCIATES| 6159|
+---------------+-----+
only showing top 20 rows



## Write output to pull for local analysis

In [24]:
facilitiesAgg \
  .repartition(1) \
  .write \
  .option("header", True) \
  .csv("gs://daasnerds-cracircle/aact/etl/csv/facilitiesAgg.csv")

In [None]:
facilitiesWords \
  .repartition(1) \
  .write \
  .option("header", True) \
  .csv("gs://daasnerds-cracircle/aact/etl/csv/facilitiesWords.csv")