## How to run this notebook

1. Download the docker image: `docker pull jupyter/pyspark-notebook`

2. Make sure you have the input data containing the UMLS IDs for each titles obtained from running `get_ids_from_abs.py`: `umls_cui_in_titles.txt`
 
3. Start the PySpark jupyter notebook by running the docker and mount the volume of where the data
   - `docker run -it -p 8888:8888 -v /Users/slin/covid_nlp/title_result:/mnt/result jupyter/pyspark-notebook`

4. Go to `http://localhost:8888` in a browser. It'd ask for a token and a password. Token can be found in the console running the notebook. password can be anything.

5. Import this file into Docker container.

see more instruction here https://levelup.gitconnected.com/using-docker-and-pyspark-134cd4cab867

In [180]:
import os
os.listdir('/mnt/result') 

['100k_200k',
 '1_100k',
 '200k_345k',
 'concept_counts',
 'concept_map',
 'count',
 'count_sorted',
 'umls_cui_in_titles.txt']

In [146]:
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [None]:
sc = SparkContext("local","Find number of occurrences of concepts")

In [179]:
# filename "all" is the file name where each line is the umls present in a title. there are ~138k titles
words = sc.textFile("/mnt/result/umls_cui_in_titles.txt").flatMap(lambda line: line.split(","))

In [150]:
words.take(5)

['C3714514', 'C0948075', 'C2242472', 'C0009450', 'C0699744']

In [151]:
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)

In [152]:
wordCounts.take(1)

[('C3714514', 12513)]

In [19]:
wordCounts.saveAsTextFile("/mnt/result/count") # saving intermediate file

In [153]:
wordCounts

PythonRDD[284] at RDD at PythonRDD.scala:53

In [157]:
# sort the result just to get an idea
counts_sorted = wordCounts.sortBy(lambda item: item[1], ascending=False)
counts_sorted.saveAsTextFile("/mnt/result/count_sorted")


In [158]:
counts_sorted.take(10)

[('C0009450', 13686),
 ('C0042769', 13094),
 ('C3714514', 12513),
 ('C0206419', 12111),
 ('C0948075', 11369),
 ('C0010078', 11191),
 ('C0206423', 10145),
 ('C1550587', 9993),
 ('C1556682', 9782),
 ('C1175743', 7511)]

These are the top ten entries in the file
where C0009450 means "communicable diseases", and C0042769 means "virus disease". Make sense. 

In [159]:
concept_maps = sc.textFile("/mnt/result/concept_map").map(lambda line: line.split(","))

In [160]:
concept_maps.take(1)

[['22274', 'C0027651', 'C2981607', 'C1882062', 'C1368871', 'C0026640']]

In this file, each concept id contains many related CUI (strings that start with "C"). Based on the counts of CUIs (wordCounts), we need to use that information to obtain the counts for each concept ids.
Since it's not a 1-to-1 relationship, and that some concept ids might contain CUI(s) that are in multiple concepts, the best data structure I can think of is map of CUI to list of concept ids. We'd use another map to keep count of the concepts. 

In [161]:
# convert from ['22274', 'C0027651', 'C2981607', 'C1882062', 'C1368871', 'C0026640'] to 
# RDD [('22274', 'C0027651'), ('22274', 'C2981607'), ...]

concept_maps = concept_maps.map(lambda line: (line[0], line[1:]))
    

In [162]:
concept_maps.take(1)

[('22274', ['C0027651', 'C2981607', 'C1882062', 'C1368871', 'C0026640'])]

In [163]:
def convert_to_tuple_list(input):
    concept_id = input[0]
    return [(cui, concept_id) for cui in input[1]]
concept_rdd = concept_maps.flatMap(lambda entry: convert_to_tuple_list(entry))

In [164]:
concept_rdd.take(10)

[('C0027651', '22274'),
 ('C2981607', '22274'),
 ('C1882062', '22274'),
 ('C1368871', '22274'),
 ('C0026640', '22274'),
 ('C0002895', '22281'),
 ('C2699300', '22281'),
 ('C1260595', '22281'),
 ('C0750151', '22281'),
 ('C3273373', '22281')]

In [165]:
wordCounts.take(10)

[('C3714514', 12513),
 ('C0948075', 11369),
 ('C2242472', 1540),
 ('C0009450', 13686),
 ('C0699744', 1634),
 ('C3810607', 271),
 ('C0028128', 287),
 ('C0028215', 221),
 ('C0600437', 199),
 ('C2610645', 199)]

In [166]:
joined_result = concept_rdd.join(wordCounts)
joined_result.take(2)

[('C2981607', ('22274', 6)), ('C2981607', ('24602', 6))]

In [169]:
joined_result = joined_result.map(lambda x: x[1])
joined_result = joined_result.groupByKey().mapValues(sum)
joined_result.take(2)

[('997899', 4), ('997898', 4)]

In [170]:
# save the sorted result
joined_result = joined_result.sortBy(lambda item: item[1], ascending=False)
joined_result.saveAsTextFile("/mnt/result/concept_counts")

In [171]:
joined_result.count()

9006

In [181]:
# this show that if we divide up the counts into 20 buckets, most of them are in the first buckets.
joined_result.map(lambda x: x[1]).histogram(20) 

([1.0,
  2926.35,
  5851.7,
  8777.05,
  11702.4,
  14627.75,
  17553.1,
  20478.45,
  23403.8,
  26329.149999999998,
  29254.5,
  32179.85,
  35105.2,
  38030.549999999996,
  40955.9,
  43881.25,
  46806.6,
  49731.95,
  52657.299999999996,
  55582.65,
  58508],
 [8429, 400, 30, 47, 10, 7, 2, 25, 2, 0, 9, 1, 1, 2, 0, 0, 0, 0, 0, 41])

### Testing ground
Below are sandbox for me to test out whether the code would work as expected. Feel free to ignore :)

In [140]:
x = sc.parallelize([("cui_a", 'cpt1'), ("cui_b", 'cpt1'), ("cui_a", 'cpt2')])  # like `concept_rdd` (umls_cui, concept_id)
y = sc.parallelize([("cui_a", 2), ("cui_b", 3), ("cui_c", 5)]) # like `wordCounts` (umls_cui, count)
z = x.join(y)
z.collect()

[('cui_a', ('cpt1', 2)), ('cui_a', ('cpt2', 2)), ('cui_b', ('cpt1', 3))]

In [141]:
temp = z.map(lambda x: x[1])
temp.collect()

[('cpt1', 2), ('cpt2', 2), ('cpt1', 3)]

In [142]:
aggregated_counts = temp.groupByKey().mapValues(sum)
aggregated_counts.collect()

[('cpt1', 5), ('cpt2', 2)]