### Fetching The 7z archive

**Skip this Section if you already have performed the extraction process and jump to checkpoint for pulling data from split json files.**

In [None]:
# Checking if archive is downloaded in memory.
try:
    dbutils.fs.ls("file:/databricks/driver/dblp.v13.7z")
    print("Archive in filesystem (file:/databricks/driver/dblp.v13.7z)")
except:
    # If archive is not in memory, Checking databricks store for cached version and pulling into memory.
    try:
        dbutils.fs.ls("dbfs:/FileStore/data/dblp.v13.7z")
        print("Archive located in FileStore. Copying into local store..")
        dbutils.fs.cp("dbfs:/FileStore/data/dblp.v13.7z", "file:/databricks/driver/dblp.v13.7z")
        print("Completed")
    except:
        # If archive is not cached, downloading and storing in databricks store.
        print("7z archive not found. Fetching from URL...")
        !wget https://originalstatic.aminer.cn/misc/dblp.v13.7z
        print("7z archive Downloaded. Moving archive to FileStore..")
        dbutils.fs.mkdirs("dbfs:/FileStore/data")
        dbutils.fs.cp("file:/databricks/driver/dblp.v13.7z", "dbfs:/FileStore/data/dblp.v13.7z")
        print("Completed.")

In [None]:
# The returned array should have one object of FileInfo with size =2568255035

dbutils.fs.ls("file:/databricks/driver/dblp.v13.7z")

### Extracting Archive into json

#### 1. Extracting 7zip file into 16 GB json.

In [None]:
!pip install py7zr -q

In [None]:
import py7zr

archive = py7zr.SevenZipFile('dblp.v13.7z', mode='r')
archive.extractall()
archive.close()

In [None]:
dbutils.fs.ls("file:/databricks/driver/dblpv13.json")

#### 2. Cleaning NumberInt(#) tags

The json data contains non-confirming tags, and so cannot be parsed as it is. We will read each line and substitute the tag. (This should take about 25 minutes)

In [None]:
import re

# Cleaning the `NumberInt` tag
fin = open(f"dblpv13.json")
fout = open(f"dblpv13_clean.json", "wt")
for line in fin:
    fout.write(re.sub(r"NumberInt\([\d]*\)", lambda x: "".join(re.findall(r"\d", x.group(0))), line))
fin.close()
fout.close()

#### 3. Partitioning Dataset into JSON files
Since the whopping 16 GB of json data cannot be loaded into memory directly, we need to partition the data into smaller chunks (300k objects per chunk) for processing.  
We also parse data encoded as Decimal data with DecimalEncoder.

In [None]:
%mkdir data

In [None]:
import ijson
import json
import decimal

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            return str(o)
        return super(DecimalEncoder, self).default(o)

data_dir = 'data/'
with open('dblpv13_clean.json', 'r') as f:
    counter, file_id = 0, 0
    file_buffer = []
    for obj_data in ijson.items(f, 'item'):
        file_buffer.append(obj_data)
        counter += 1
        if counter % 300000 == 0:
            print(f" Saving, data_PART_{file_id}.json in {data_dir}")
            f = open(f'{data_dir}data_PART_{file_id}.json', 'w')
            dump = json.dumps(file_buffer, cls=DecimalEncoder)
            f.write(dump)
            f.close()
            file_id += 1
            file_buffer = []
f = open(f'{data_dir}data_PART_{file_id}.json', 'w')
dump = json.dumps(file_buffer, cls=DecimalEncoder)
print(f" Saving, data_PART_{file_id}.json in {data_dir}")
f.write(dump)
f.close()
file_id += 1
file_buffer = []

#### 4. Moving files to dbfs FileStore from instance storage, to make it available for later.

In [None]:
# removing old json stored in filestore.
dbutils.fs.rm("dbfs:/FileStore/data/split_data/", recurse = True)
# Creating dir to store json in filestore..
dbutils.fs.mkdirs("dbfs:/FileStore/data/split_data")
# confirming dir is empty
dbutils.fs.ls("dbfs:/FileStore/data/split_data")

In [None]:
# Copying all json parts into filestore.
dbutils.fs.cp("file:/databricks/driver/data/", "dbfs:/FileStore/data/split_data", recurse = True)

# Checkpoint after data load

### Reading data from databricks Filestore into dataframes

In [None]:
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, ArrayType  

path = "dbfs:/FileStore/data/split_data/"

# There should be 18 files each with 300 k records. This would change if you change split value.
file_count = len(dbutils.fs.ls(path))
assert file_count == 18, "Data not found. You may want to check the path or run the notebook from start again. If you updated the split value, ignore this assertion error"

In [None]:
# Build map of spark dataframes by reading json partition chunk files
dataframes_map = map(lambda r: spark.read.option("inferSchema", True).json(r), [f"{path}data_PART_{num}.json" for num in range(file_count)])
# reduce the dataframes into single dataframe by performing union over the mapped frames.
union = reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dataframes_map)
union.printSchema()

In [None]:
parquet_path = "/tmp/out"
# dbutils.fs.rm(f"{parquet_path}", recurse = True)

def buildFoSTable(dataframe):
    # pull required Fields
    fosFrame = dataframe.select(F.explode_outer("fos").alias("Field_of_Study"))

    # fosFrame.show()
    # Clean (delete dups, Fill NaN values?, ...)
    fosFrame = fosFrame.fillna("Unknown")
    fosFrame = fosFrame.distinct()
    
    # Append to Parquet file
    # fos_frame.write.mode('append').parquet("/tmp/out/field_of_study.parquet")
    fosFrame.write.mode('overwrite').parquet(f"{parquet_path}/field_of_study.parquet")
    
    # pull appeneded parquet file and get distinct records
    fosFrame = spark.read.parquet("/tmp/out/field_of_study.parquet")
    
    # Index
    df = fosFrame.distinct()
    df = df.select("*").withColumn("id", F.monotonically_increasing_id())
    # return the indexed Table
    return df.select("id", "Field_of_Study")
    

In [None]:
FoSFrame = buildFoSTable(union)
# map the relation in Fact Table
display(FoSFrame)

In [None]:
FoSFrame = spark.read.parquet("/tmp/out/field_of_study.parquet")
FoSFrame = FoSFrame.select("*").withColumn("id", F.monotonically_increasing_id())
FoSrdd = FoSFrame.rdd.collectAsMap()
FoSrdd_map = F.map_from_arrays(
    F.array(*map(F.lit, FoSrdd.keys())),
    F.array(*map(F.lit, FoSrdd.values()))
)

In [None]:
fosUnion = union.select(F.explode_outer("fos").alias("Field_of_Study"), '*')
fosUnion.printSchema()

In [None]:
fosUnion = fosUnion.withColumn('FoS_fk', FoSrdd_map.getItem(F.col('Field_of_Study')))
fosUnion = fosUnion.withColumn('FoS_fk', F.when(F.col('FoS_fk').isNull(), 0).otherwise(F.col('FoS_fk')))

display(fosUnion)

In [None]:
fosUnion = fosUnion.join(
    F.broadcast(FoSFrame.select("*")), 
    "FoS_fk" == FoSFrame.id,
    how="left"
).select("FoS_fk", "_id", "id", "Field_of_Study")

In [None]:
display(fosUnion)

## PLAYGROUND/SANDBOX snippets

Testing scripts while Working with single chunk to reduce processing time..

In [None]:
# Data path:
path = "dbfs:/FileStore/data/split_data/"

# Reading first chunk
first_frame = spark.read.option("inferSchema", True).json(f"{path}data_PART_0.json")
first_frame.show()

In [None]:
# Looking at schema
first_frame.printSchema()

In [None]:
# Extracting Authors from the dataset

# Exploding a column returns a new row for each element in the given array or map type. 
# For each item in the map/array of data it creates a copy of the row and with that element in new column.
# Here, We only select the exploded column, and so we only get row with author object in the generated frame.
authorsDF = first_frame.select(F.explode_outer("authors").alias("authors"))


# selectExpr Projects a set of SQL expressions and returns a new DataFrame. e.g. (authors['name', 'email'] => [authors.name, authors.email])

authorsDF = authorsDF.selectExpr("authors._id", "authors.bio", "authors.email", "authors.gid", "authors.name", "authors.name_zh", "authors.oid", "authors.oid_zh", "authors.orcid", "authors.org", "authors.org_zh", "authors.orgid", "authors.orgs", "authors.orgs_zh", "authors.sid")

authorsDF.printSchema()

In [None]:
authorsDF.show()

In [None]:
# https://github.com/patelatharva/Data_Lake_with_Apache_Spark/blob/master/etl.py