<h1>Data to PySpark RDD<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Import-dependencies" data-toc-modified-id="Import-dependencies-1">Import dependencies</a></span></li><li><span><a href="#Load-data-to-a-Pyspark-RDD" data-toc-modified-id="Load-data-to-a-Pyspark-RDD-2">Load data to a Pyspark RDD</a></span><ul class="toc-item"><li><span><a href="#Read-data-from-a-single-day-1st-of-January-2016" data-toc-modified-id="Read-data-from-a-single-day-1st-of-January-2016-2.1">Read data from a single day 1st of January 2016</a></span></li><li><span><a href="#Gather-data-for-the-entire-year-into-a-single-rdd" data-toc-modified-id="Gather-data-for-the-entire-year-into-a-single-rdd-2.2">Gather data for the entire year into a single rdd</a></span></li></ul></li></ul></div>

## Import dependencies

In [1]:
import os
import warnings
warnings.simplefilter("ignore")

## Load data to a Pyspark RDD

### Read data from a single day 1st of January 2016

In [2]:
file_path = "../datasets/PubMed - medium samples/2016/2016-01-01_2016-01-02/"

df = sqlContext.read.format('com.databricks.spark.xml').option("rowTag", "record")\
    .load(file_path)

In [3]:
list(df)

[Column<b'_corrupt_record'>,
 Column<b'abstract'>,
 Column<b'back'>,
 Column<b'body'>,
 Column<b'bold'>,
 Column<b'boxed-text'>,
 Column<b'counts'>,
 Column<b'custom-meta-group'>,
 Column<b'disp-formula'>,
 Column<b'ext-link'>,
 Column<b'fig'>,
 Column<b'funding-group'>,
 Column<b'graphic'>,
 Column<b'header'>,
 Column<b'inline-formula'>,
 Column<b'italic'>,
 Column<b'kwd'>,
 Column<b'kwd-group'>,
 Column<b'list'>,
 Column<b'list-item'>,
 Column<b'metadata'>,
 Column<b'notes'>,
 Column<b'p'>,
 Column<b'sec'>,
 Column<b'self-uri'>,
 Column<b'sub'>,
 Column<b'sup'>,
 Column<b'table-wrap'>,
 Column<b'table-wrap-foot'>,
 Column<b'tbody'>,
 Column<b'td'>,
 Column<b'tr'>,
 Column<b'trans-abstract'>,
 Column<b'xref'>]

In [4]:
df.count()

457

In [5]:
df.describe()

DataFrame[summary: string, _corrupt_record: string, bold: string]

In [6]:
df = sqlContext.read.format('com.databricks.spark.xml').option("rowTag", "abstract")\
    .load(file_path)
df.show(1)

+--------------+----+----+--------+----+--------------+------+-------------+----+--------------------+--------------+----+----+-----+---------+
|_abstract-type| _id|bold|ext-link| fig|inline-formula|italic|named-content|   p|                 sec|styled-content| sub| sup|title|underline|
+--------------+----+----+--------+----+--------------+------+-------------+----+--------------------+--------------+----+----+-----+---------+
|          null|null|null|    null|null|          null|  null|         null|null|[[S1,,,, [[Neurof...|          null|null|null| null|     null|
+--------------+----+----+--------+----+--------------+------+-------------+----+--------------------+--------------+----+----+-----+---------+
only showing top 1 row



### Gather data for the entire year into a single rdd

In [9]:
root = "../datasets/PubMed - medium samples/2016/"

all_data_df = None
schema = None

count_days = 1
for filename in sorted(os.listdir(root)):
    filename_abs = root + filename
    
    if schema != None:
        df_current = sqlContext.read.format('com.databricks.spark.xml')\
                               .option("rowTag", "record")\
                               .load(filename_abs,schema=schema)
    else:
        df_current = sqlContext.read.format('com.databricks.spark.xml')\
                               .option("rowTag", "record")\
                               .load(filename_abs)
    if all_data_df == None:
        all_data_df = df_current
        schema = df_current.schema
    else:
        all_data_df = all_data_df.union(df_current)
    
    print(filename.split('.')[0] + " -- " + str(df_current.count()) + " read articles.")
    print("\t  all_data_df -- " + str(all_data_df.count()) + " records.")
        
    if count_days % 10 == 0:
#         all_data_df.write\
#                 .format("com.databricks.spark.xml")\
#                 .option("rootTag", "record")\
#                 .save("../datasets/data.xml")
        break
    
    count_days += 1

2016-01-01_2016-01-02 -- 457 read articles.
	  all_data_df -- 457 records.
2016-01-02_2016-01-03 -- 0 read articles.
	  all_data_df -- 457 records.
2016-01-03_2016-01-04 -- 0 read articles.
	  all_data_df -- 457 records.
2016-01-04_2016-01-05 -- 7 read articles.
	  all_data_df -- 464 records.
2016-01-05_2016-01-06 -- 95 read articles.
	  all_data_df -- 559 records.
2016-01-06_2016-01-07 -- 201 read articles.
	  all_data_df -- 760 records.
2016-01-07_2016-01-08 -- 132 read articles.
	  all_data_df -- 892 records.
2016-01-08_2016-01-09 -- 98 read articles.
	  all_data_df -- 990 records.
2016-01-09_2016-01-10 -- 0 read articles.
	  all_data_df -- 990 records.
2016-01-10_2016-01-11 -- 0 read articles.
	  all_data_df -- 990 records.


In [10]:
from pyspark.sql.types import *

customSchema = StructType([
    StructField("body", StringType(), True),
    StructField("metadata", StringType(), True),
    StructField("td", StringType(), True),
    StructField("tr", DoubleType(), True),
    StructField("page-count", StringType(), True),
    StructField("title", StringType(), True)])

df = spark.read \
    .format('com.databricks.spark.xml') \
    .options(rowTag='record') \
    .load(file_path, schema = customSchema)

df.toPandas().head()

Unnamed: 0,body,metadata,td,tr,page-count,title
0,<sec>\n <title>Introduction</title><p>Neu...,<article>\n <front>\n <journal-meta>\n ...,,,,
1,,<article>\n <front>\n <journal-meta>\n ...,Petty et al,,,
2,,<article>\n <front>\n <journal-meta>\n ...,,,,
3,,<article>\n <front>\n <journal-meta>\n ...,,,,
4,,<article>\n <front>\n <journal-meta>\n ...,,,,


In [11]:
df.show(5)

+--------------------+--------------------+-----------+----+----------+-----+
|                body|            metadata|         td|  tr|page-count|title|
+--------------------+--------------------+-----------+----+----------+-----+
|<sec>
      <titl...|<article>
  <fron...|       null|null|      null| null|
|                null|<article>
  <fron...|Petty et al|null|      null| null|
|                null|<article>
  <fron...|       null|null|      null| null|
|                null|<article>
  <fron...|       null|null|      null| null|
|                null|<article>
  <fron...|       null|null|      null| null|
+--------------------+--------------------+-----------+----+----------+-----+
only showing top 5 rows

