## Import data into pySpark
- https://github.com/titipata/pubmed_parser/wiki/Download-and-preprocess-MEDLINE-dataset

- MEDLINE BULK DOWNLOAD -
wget ftp://ftp.ncbi.nlm.nih.gov/pubmed/baseline/*.gz

- MEDLINE UPDATES -
wget ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/*.gz
- Change JAVA version: https://kodejava.org/how-do-i-set-the-default-java-jdk-version-on-mac-os-x/

In [1]:
import os
from glob import glob
import pubmed_parser as pp
import findspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row

findspark.init()

In [2]:
conf = SparkConf()
spark = SparkSession.builder.\
    config(conf=conf).\
    getOrCreate()

### Process medline files
- https://github.com/titipata/pubmed_parser/wiki/Download-and-preprocess-MEDLINE-dataset

In [3]:
medline_files_rdd = spark.sparkContext.parallelize(glob('../data/pubmed/*.gz'), numSlices=1000)

In [4]:
parse_results_rdd = medline_files_rdd.\
    flatMap(lambda x: [Row(file_name=os.path.basename(x), **publication_dict) 
                       for publication_dict in pp.parse_medline_xml(x)])

In [5]:
medline_df = parse_results_rdd.toDF()

In [8]:
medline_df.columns

['abstract',
 'affiliations',
 'authors',
 'chemical_list',
 'country',
 'delete',
 'doi',
 'file_name',
 'issn_linking',
 'journal',
 'keywords',
 'medline_ta',
 'mesh_terms',
 'nlm_unique_id',
 'other_id',
 'pmc',
 'pmid',
 'pubdate',
 'publication_types',
 'references',
 'title']

## Create view, run sql queries

In [9]:
# Create view as temporary table
medline_df.createOrReplaceTempView("table")

In [14]:
# Find distint countries, show 10 first results
spark.sql("select count(*) from table")

DataFrame[count(1): bigint]

In [16]:
# save to parquet
# medline_df.write.parquet('raw_medline.parquet', mode='overwrite')

In [6]:
# Process updates
from pyspark.sql import Window
from pyspark.sql.functions import rank, max, sum, desc

In [8]:
window = Window.partitionBy(['pmid']).orderBy(desc('file_name'))

In [12]:
windowed_df = medline_df.select(
    max('delete').over(window).alias('is_deleted'),
    rank().over(window).alias('pos'),
    '*')

In [None]:
# Calculate TF-IDF in all results

In [None]:
sqlContext.registerDataFrameAsTable(df, "table1")

## Classify with Fastest
- https://www.futurice.com/blog/classifying-text-with-fasttext-in-pyspark
- https://fasttext.cc/docs/en/supervised-tutorial.html