In [195]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

In [197]:
# Start a spark session
spark = SparkSession.builder.appName('Paper_qty').getOrCreate()

In [203]:
# Read in csv
dataframe_1 = spark.read.format("csv").option("header", "true").load("group_1.csv")
dataframe_1=dataframe_1.withColumn("source_label", when(dataframe_1["label"]!="0","0" ).otherwise(dataframe_1["label"]))

dataframe_2 = spark.read.format("csv").option("header", "true").load("group_2.csv")
dataframe_2=dataframe_2.withColumn("source_label", when(dataframe_2["label"]!="1","1" ).otherwise(dataframe_2["label"]))


dataframe_3 = spark.read.format("csv").option("header", "true").load("group_3.csv")
dataframe_3=dataframe_3.withColumn("source_label", when(dataframe_3["label"]!="2","2" ).otherwise(dataframe_3["label"]))


# #combine or append the dataframes
df=dataframe_1.union(dataframe_2).union(dataframe_3)
print("Number of records: " + str(df.count()))
df.show()

Number of records: 45028
+---+--------+--------------------+--------------------+-------+-----+------------+
|_c0|    pmid|               title|            abstract|journal|label|source_label|
+---+--------+--------------------+--------------------+-------+-----+------------+
| 32|29590094|Quantized Majoran...|Majorana zero-mod...| nature|    0|           0|
| 33|29590093|The logic of sing...|Neocortical areas...| nature|    0|           0|
| 34|29590092|Itaconate is an a...|The endogenous me...| nature|    0|           0|
| 35|29590091|A new class of sy...|A challenge in th...| nature|    0|           0|
| 36|29590090|Architecture of t...|Nutrients, such a...| nature|    0|           0|
| 37|29590089|Whole-organism cl...|Embryonic develop...| nature|    0|           0|
| 38|29590088|Structure of the ...|The shape, elonga...| nature|    0|           0|
| 39|29579743|Room-temperature ...|Room-temperature ...| nature|    0|           0|
| 40|29562235|Shifts in tree fu...|Forests have a k

In [205]:
df.filter(df['pmid']=='28953878').select("abstract","label","source_label").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------+------------+
|abstract                                                                                                                                                                                                                                                                                                                                                                          

In [210]:
df.groupBy("label").count().show(5)

+--------------------+-----+
|               label|count|
+--------------------+-----+
| and response to ...|    1|
| the differences ...|    1|
|                SOX2|    1|
|            habitual|    1|
|     area patterning|    1|
+--------------------+-----+
only showing top 5 rows



In [207]:
df.groupBy("source_label").count().show()

+------------+-----+
|source_label|count|
+------------+-----+
|           0|13611|
|           1|14597|
|           2|16820|
+------------+-----+



In [212]:
#wrong labels
print("Total # of Wrongly labeled features: " + str(df.select('label').distinct().count()-2))
df.select('abstract','label').distinct().filter(df.label.isin(["0","1","2"])==False).show(3,truncate=False)

Total # of Wrongly labeled features: 1652
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [215]:
df.select('abstract','label','source_label').distinct().filter(df.label.isin(["0","1","2"])==False).show(3)

+--------------------+--------------------+------------+
|            abstract|               label|source_label|
+--------------------+--------------------+------------+
|"The global geody...| which implies th...|           0|
|"When deformed be...| as well as two m...|           0|
|"The synaptic mec...| and suggest that...|           1|
+--------------------+--------------------+------------+
only showing top 3 rows



In [223]:
# df.createTempView("Publ")
df2=spark.sql("SELECT 'pmid', 'title', concat('abstract','label'), 'journal',  'source_label' FROM Publ")
df2.show()

+----+-----+-----------------------+-------+------------+
|pmid|title|concat(abstract, label)|journal|source_label|
+----+-----+-----------------------+-------+------------+
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|          abstractlabel|journal|source_label|
|pmid|title|  

In [20]:
# Tokenize dataframe
tokened = Tokenizer(inputCol="journal", outputCol="words")
tokened_transformed = tokened.transform(dataframe_1)
tokened_transformed.count()

13611

In [5]:
# Remove stop words
stop_list = ["@VirginAmerica", "$30", "@virginamerica"]
remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stop_list)
removed_frame = remover.transform(tokened_transformed)
removed_frame.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------------

In [6]:
# Run the hashing term frequency
hashing = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,4))

# Transform into a DF
hashed_df = hashing.transform(removed_frame)
hashed_df.show()

+--------------------+--------------------+--------------------+--------------------+
|      Airline Tweets|               words|            filtered|        hashedValues|
+--------------------+--------------------+--------------------+--------------------+
|@VirginAmerica pl...|[@virginamerica, ...|[plus, you've, ad...|(16,[3,4,5,7,8,9,...|
|@VirginAmerica se...|[@virginamerica, ...|[seriously, would...|(16,[0,1,2,3,4,9,...|
|@VirginAmerica do...|[@virginamerica, ...|[do, you, miss, m...|(16,[0,1,8,10,11,...|
|@VirginAmerica Ar...|[@virginamerica, ...|[are, the, hours,...|(16,[0,1,2,4,7,9,...|
|@VirginAmerica aw...|[@virginamerica, ...|[awaiting, my, re...|(16,[0,3,4,6,7,8,...|
+--------------------+--------------------+--------------------+--------------------+



In [7]:
# Fit the IDF on the data set 
idf = IDF(inputCol="hashedValues", outputCol="features")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

In [8]:
# Display the dataframe
rescaledData.select("words", "features").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                                          |features                                                                                                                                                                                                             |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------

In [9]:
# Stop Spark
spark.stop()