In [92]:
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
from pyspark.sql import SparkSession, SQLContext
from datetime import datetime
import json
from pyspark.sql.types import *
from datetime import timedelta
from operator import add

In [134]:
import nltk
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.ml.linalg import Vector, Vectors
from pyspark.ml.clustering import LDA, LDAModel

In [2]:
# functions
def find_keywords(row):
    k1 = row.select('keywords').rdd
    k1 = k1.map(lambda x: x.keywords.strip('[').strip(']')).filter(lambda x: len(x) > 0).filter(lambda x: x != 'nan')
    k1 = k1.flatMap(lambda x: x.split(','))
    k1 = k1.map(lambda x : (x.strip(' ').strip("'").lower(),1))
    kk1 = k1.reduceByKey(add).sortBy(lambda x: x[1] ,ascending =False)
    return kk1.collect()

def find_journal(row): 
    j1 = row.select('journal').rdd
    j1 = j1.map(lambda x: x.journal.strip('[').strip(']')).filter(lambda x: len(x) > 0).filter(lambda x: x != 'nan')
    j1 = j1.map(lambda x : (x,1))
    j1 = j1.reduceByKey(add).sortBy(lambda x: x[1] ,ascending =False)
    return j1.collect()

In [None]:
# transform csv file to json file

pd_cancer = pd.read_csv('cancer_total.csv')
pd_cancer = pd_cancer.drop(list(pd_cancer.columns[:2]), axis=1)

info_dict = {'pubmed_id', 'authors', 'keywords','abstract','publication_date','title','results','doi', 'journal'}
index_test = 0
file_name =0

total= []

for i in range(len(pd_cancer['pubmed_id'])):
    test_dict = {}
   
    for key in list(info_dict):
        test_dict[key] = str(pd_cancer[key][i])
     
    total.append(test_dict)
    index_test += 1 
    if index_test == 151500:
        with open(f'cjson/c_{file_name}.json', 'w') as f:
            json.dump(total, f)
        print(index_test)
        file_name +=1
        
        index_test = 0
        total= []    

In [3]:
spark = SparkSession.builder.appName("t_cancer").getOrCreate()

In [35]:
schema = StructType([StructField("pubmed_id", StringType(), True), StructField("authors", StringType(), True), StructField("keywords", StringType(), True),
                    StructField("abstract", StringType(), True),StructField("publication_date", DateType(), True),StructField("title", StringType(), True),
                    StructField("results", StringType(), True),StructField("doi", StringType(), True),StructField("journal", StringType(), True)])

total_cancer = spark.read.json(f'gs://project_rong/cancer/cjson/c_0.json', schema=schema)
for i in range(1,10): 
    x = spark.read.json(f'gs://project_rong/cancer/cjson/c_{i}.json', schema=schema)
    total_cancer = total_cancer.union(x)

In [None]:
time_dict = {}

for i in range(2009, 2019):
    time_period = total_cancer.filter(total_cancer['publication_date'] >= datetime(i,1,1)).filter(total_cancer['publication_date'] <= datetime(i+1,1,1))
    time_dict[i] = {}
    time_dict[i]['total_articles'] = time_period.count()
    time_dict[i]['keywords'] = find_keywords(time_period)[:20]
    time_dict[i]['journal'] = find_journal(time_period)[:20]

    
with open('total_cancer.json', 'w') as f:
    json.dump(time_dict, f)
    
! gsutil cp total_cancer.json gs://project_rong/cancer/total_cancer.json

In [50]:
 # LDA model 
sp1 = total_cancer.select('abstract').rdd
sp2 = sp1.filter(lambda x:  x.abstract != 'nan')
tokens = sp2.map( lambda x: x.abstract.strip().lower()).map( lambda x: x.replace('\n' , ''))\
.map( lambda x: x.replace('\t' , '')).map( lambda abstract: re.split(" ", abstract))\
.map( lambda word: [x for x in word if x.isalpha()])\
.map(lambda word: [x for x in word if len(x) >= 3])\
.map( lambda word: [x for x in word if x not in StopWords]).zipWithIndex()

In [None]:
sqlContext = SQLContext(spark)
df_txts = sqlContext.createDataFrame(tokens, ["list_of_words",'index'])

cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(df_txts)
result_cv = cvmodel.transform(df_txts)

# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv) 

# choosenumber of topics
num_topics = 10
max_iterations = 100
lda = LDA(k=num_topics, maxIter=max_iterations)
lda_model = lda.fit(result_tfidf)


topics = lda_model.describeTopics(10)
ax = lda_model.describeTopics(wordNumbers).rdd
termindex = ax.map(lambda x: x.termIndices)
term_weight = ax.map(lambda x: x.termWeights)

In [114]:
# words in topics
results = []
for each_row in termindex.collect():
    row_word = []
    for item in each_row: 
        word = cvmodel.vocabulary[item]
        row_word.append(word)
    results.append(row_word)


In [None]:
test_result = {'test': results, 'weight':term_weight.collect()}

with open ('lda_model_1211.json', 'w') as f:
       json.dump(test_result , f)
        
! gsutil cp lda_model_1211.json gs://project_rong/lda_model_1211.json

In [None]:
# result_tfidf[['index','features']].show()

In [253]:
# result_tfidf[['list_of_words', 'features']].show()
# result_cv.show()

In [130]:
result_tfidf[['index','features']].rdd.map(list).take(1)

[[0,
  SparseVector(5000, {2: 1.1589, 5: 1.6701, 22: 1.7419, 26: 1.8097, 33: 2.154, 34: 2.1329, 60: 4.3026, 63: 4.7983, 74: 2.2263, 91: 2.794, 101: 2.2473, 112: 2.5541, 155: 2.8054, 163: 2.8796, 168: 2.9822, 174: 2.6956, 179: 2.7365, 181: 2.6945, 187: 2.6608, 193: 2.8538, 208: 2.9898, 224: 5.5805, 287: 3.2008, 316: 3.0337, 328: 3.2714, 342: 3.3235, 347: 3.1839, 374: 6.5963, 405: 3.2816, 448: 3.3718, 518: 3.5949, 561: 3.7863, 588: 3.6034, 605: 3.6468, 617: 4.0361, 634: 3.8244, 682: 4.2228, 714: 3.8458, 826: 3.9851, 916: 3.9727, 984: 8.6447, 1069: 9.2831, 1507: 4.5646, 1555: 4.7337, 1824: 4.8351, 1937: 4.7603, 2902: 5.359, 3084: 5.8063, 3094: 5.6197, 3198: 5.4625, 3652: 5.8063})]]

In [None]:
num_topics = 10
max_iterations = 100
lda = LDA(k=num_topics, maxIter=max_iterations)
lda_model = lda.fit(result_tfidf)
# lda_model = LDA.fit(result_tfidf[['index','features']].rdd.map(list), k=num_topics, maxIterations=max_iterations)

In [None]:
# lda.save('s1lda')
topics = lda_model.describeTopics(10)
topics.show(truncate=False)

In [240]:
wordNumbers = 10

In [161]:
sc = spark.sparkContext

In [250]:
ax = lda_model.describeTopics(wordNumbers).rdd
x12 = ax.map(lambda x: x.termIndices)
term_weight = ax.map(lambda x: x.termWeights)

x12.collect()
term_weight.collect()


[[0.01086313526456371,
  0.008521772761877369,
  0.007667098108510024,
  0.006679464117423838,
  0.006635663372845226,
  0.006210634351912754,
  0.005956553463990851,
  0.005814040440574243,
  0.005508618321781763,
  0.004705677390302103],
 [0.010515908222162513,
  0.010016743546634016,
  0.009979219991289982,
  0.005923364473632306,
  0.005526009465707872,
  0.005341963195986004,
  0.005298643375031896,
  0.004807256895743788,
  0.0046855985148840755,
  0.004351431250237673],
 [0.006533760887340074,
  0.0061429398921155004,
  0.005975414080958479,
  0.005840504657413391,
  0.004951230894701462,
  0.004438720889458821,
  0.0037564882544591223,
  0.003704854541068814,
  0.0034352227537835325,
  0.0034111691233958573],
 [0.012678995296530032,
  0.009290218645934165,
  0.006864402696410305,
  0.0064190846937411975,
  0.004921527657685131,
  0.004592252303495899,
  0.004579245104193475,
  0.0044595838394146025,
  0.004443179476983699,
  0.004194450304107405],
 [0.012674354853201281,
  0.00

In [188]:
topicIndices = lda_model.describeTopics(wordNumbers).rdd

In [242]:
results = []
for each_row in x12.collect():
    row_word = []
    for item in each_row: 
        word = cvmodel.vocabulary[item]
        row_word.append(word)
    results.append(row_word)
print(results)

[['dose', 'brain', 'radiation', 'imaging', 'mm', 'images', 'glioma', 'volume', 'image', 'plasma'], ['dna', 'hpv', 'cells', 'cell', 'leukemia', 'damage', 'human', 'repair', 'infection', 'viral'], ['health', 'cancer', 'risk', 'women', 'screening', 'mortality', 'use', 'among', 'studies', 'breast'], ['expression', 'cell', 'cells', 'protein', 'signaling', 'growth', 'apoptosis', 'gene', 'activation', 'role'], ['patients', 'survival', 'lymph', 'stage', 'p', 'surgery', 'prognostic', 'node', 'recurrence', 'group'], ['case', 'diagnosis', 'lesions', 'liver', 'patients', 'patient', 'cases', 'bone', 'malignant', 'rare'], ['cells', 'tumor', 'melanoma', 'mice', 'immune', 'molecular', 'stem', 'endothelial', 'cell', 'growth'], ['breast', 'risk', 'cervical', 'samples', 'cases', 'genetic', 'association', 'women', 'sensitivity', 'mutation'], ['patients', 'thyroid', 'chemotherapy', 'response', 'treatment', 'median', 'toxicity', 'bladder', 'dose', 'survival'], ['prostate', 'quality', 'patients', 'life', 'ca

In [251]:
test_result = {'test': results, 'weight':term_weight.collect()}

with open ('lda_model_1211.json', 'w') as f:
       json.dump(test_result , f)

In [252]:
! gsutil cp lda_model_1211.json gs://project_rong/lda_model_1211.json

Copying file://lda_model_1211.json [Content-Type=application/json]...
/ [1 files][  3.2 KiB/  3.2 KiB]                                                
Operation completed over 1 objects/3.2 KiB.                                      


In [187]:
transformed.select('raw_features').take(1)

StopWords = stopwords.words("english")
StopWords

[Row(raw_features=SparseVector(5000, {2: 1.0, 5: 1.0, 22: 1.0, 26: 1.0, 33: 1.0, 34: 1.0, 60: 2.0, 63: 2.0, 74: 1.0, 91: 1.0, 101: 1.0, 112: 1.0, 155: 1.0, 163: 1.0, 168: 1.0, 174: 1.0, 179: 1.0, 181: 1.0, 187: 1.0, 193: 1.0, 208: 1.0, 224: 2.0, 287: 1.0, 316: 1.0, 328: 1.0, 342: 1.0, 347: 1.0, 374: 2.0, 405: 1.0, 448: 1.0, 518: 1.0, 561: 1.0, 588: 1.0, 605: 1.0, 617: 1.0, 634: 1.0, 682: 1.0, 714: 1.0, 826: 1.0, 916: 1.0, 984: 2.0, 1069: 2.0, 1507: 1.0, 1555: 1.0, 1824: 1.0, 1937: 1.0, 2902: 1.0, 3084: 1.0, 3094: 1.0, 3198: 1.0, 3652: 1.0}))]

In [None]:


all_keywords = set()
for time in time_dict.keys(): 
    for i in time_dict[time]['keywords']:
        all_keywords.add(i[0])
all_keywords  


xx1 = spark.read.json(f'gs://project_rong/cancer/cjson/c_1.json', schema=schema)
time_1 = xx1.filter(xx1['publication_date'] > datetime(2010,1,1)).filter(xx1['publication_date'] < datetime(2011,1,1))
time_1.count()


time_dict = {}
# xx1.select('*').orderBy('publication_date', ascending = False).show()

for i in range(2009, 2019):
    time_period = xx1.filter(xx1['publication_date'] >= datetime(i,1,1)).filter(xx1['publication_date'] <= datetime(i+1,1,1))
    time_dict[i] = {}
    time_dict[i]['total_articles'] = time_period.count()
    time_dict[i]['keywords'] = find_keywords(time_period)[:20]
#     for each_key in find_keywords(time_period)[:11]:
#         time_dict[i]['keywords'][each_key][0] =each_key[1]
    time_dict[i]['journal'] = find_journal(time_period)[:20]
    
print(time_dict)

In [None]:
# test
# tokens = reviews.map( lambda document: document.strip().lower()).map( lambda document: re.split(" ", document))          \
#     .map( lambda word: [x for x in word if x.isalpha()])           \
#     .map( lambda word: [x for x in word if len(x) > 3] )           \
#     .map( lambda word: [x for x in word if x not in StopWords])    \
#     .zipWithIndex()


# sp1 = time_1.select('abstract').rdd