In [1]:
sc.version

'3.1.1'

### Imports

In [21]:
from configparser import ConfigParser
from pathlib import Path
import os
import subprocess
import time


### Define directories

In [3]:
# Define directories
#
# Relevant directories are read from the config file:
# dir_data:    full path to hdfs directory where the raw data .gz files are stored
# dir_parquet: full path to hdfs directory where the parquet tables will be stored

cf = ConfigParser()
cf.read("config.cf")

dir_data = Path(cf.get("spark", "dir_data"))
dir_parquet = Path(cf.get("spark", "dir_parquet"))

In [22]:
# Configuration hdfs
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
hdfs_dir_parquet = spark._jvm.org.apache.hadoop.fs.Path(dir_parquet.as_posix())
hdfs_dir_data = spark._jvm.org.apache.hadoop.fs.Path(dir_data.as_posix())

# Create output directories if they do not exist
if not fs.exists(hdfs_dir_parquet):
    fs.mkdirs(hdfs_dir_parquet)


### Save all tables to parquet

In [60]:
%%time
tables = [el.getPath().getName() for el in fs.listStatus(hdfs_dir_data)]
tables = list(set([el.split("_")[0] for el in tables if 
              (el.endswith(".gz") or el.endswith(".csv"))]))

for tbl in sorted(tables):
    # Read all gz of same table
    df = spark.read.csv(f"{dir_data.joinpath(tbl).as_posix()}*", header=True)
    # Save to parquet
    df.write.parquet(
        dir_parquet.joinpath(f"{tbl}.parquet").as_posix(),
        mode="overwrite",
    )
    print("Processed table", tbl, "- Rows:", df.count())

22/03/01 23:30:47 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
	at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
	at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
	at org.apache.spark.scheduler.EventLoggingListener.r

Processed table tls201 - Rows: 114690034


                                                                                

Processed table tls202 - Rows: 94685759


22/03/01 23:34:05 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
	at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
	at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
	at org.apache.spark.scheduler.EventLoggingListener.r

Processed table tls203 - Rows: 72641357


                                                                                

Processed table tls204 - Rows: 45965380


                                                                                

Processed table tls205 - Rows: 4158216


                                                                                

Processed table tls206 - Rows: 81582243


                                                                                

Processed table tls207 - Rows: 298723394


                                                                                

Processed table tls209 - Rows: 292526954


                                                                                

Processed table tls210 - Rows: 25980433


                                                                                

Processed table tls211 - Rows: 135250328


                                                                                

Processed table tls212 - Rows: 425171934


                                                                                

Processed table tls214 - Rows: 31141529


                                                                                

Processed table tls215 - Rows: 732932899


                                                                                

Processed table tls216 - Rows: 4291778


                                                                                

Processed table tls222 - Rows: 360735009


                                                                                

Processed table tls223 - Rows: 39008342


                                                                                

Processed table tls224 - Rows: 326003568


                                                                                

Processed table tls225 - Rows: 146158190


                                                                                

Processed table tls226 - Rows: 98771304


                                                                                

Processed table tls227 - Rows: 387970770


                                                                                

Processed table tls228 - Rows: 219643872


                                                                                

Processed table tls229 - Rows: 133578287


                                                                                

Processed table tls230 - Rows: 133674867


                                                                                

Processed table tls231 - Rows: 357150176
Processed table tls801 - Rows: 242
Processed table tls803 - Rows: 3962
Processed table tls901 - Rows: 764
Processed table tls902 - Rows: 850
Processed table tls904 - Rows: 2056
CPU times: user 3.18 s, sys: 687 ms, total: 3.87 s
Wall time: 44min 42s


## Create a new table for Patent Applications


In [62]:
%%time

df_201 = spark.read.parquet(dir_parquet.joinpath(f"tls201.parquet").as_posix())
df_202 = spark.read.parquet(dir_parquet.joinpath(f"tls202.parquet").as_posix())
df_203 = spark.read.parquet(dir_parquet.joinpath(f"tls203.parquet").as_posix())

patstat_appln = (df_201.join(df_202, df_201.appln_id ==  df_202.appln_id, "left")
                      .drop(df_202.appln_id)
                      .join(df_203, df_201.appln_id ==  df_203.appln_id, "left")
                      .drop(df_203.appln_id)
                )

patstat_appln.write.parquet(
        dir_parquet.joinpath(f"patstat_appln.parquet").as_posix(),
        mode="overwrite",
    )

print("Created table patstat_appln", "- Rows:", patstat_appln.count())

22/03/02 00:27:43 ERROR scheduler.TaskSchedulerImpl: Lost executor 9 on node60.cluster.tsc.uc3m.es: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 00:27:43 WARN scheduler.TaskSetManager: Lost task 26.0 in stage 160.0 (TID 8534) (node60.cluster.tsc.uc3m.es executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 00:27:43 WARN scheduler.TaskSetManager: Lost task 16.0 in stage 160.0 (TID 8524) (node60.cluster.tsc.uc3m.es executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 00:27:43 WARN scheduler.TaskSetManager: Lost task 36.0 

22/03/02 00:29:26 ERROR scheduler.TaskSchedulerImpl: Lost executor 7 on node56.cluster.tsc.uc3m.es: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 00:29:26 WARN scheduler.TaskSetManager: Lost task 113.1 in stage 160.0 (TID 8648) (node56.cluster.tsc.uc3m.es executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 00:29:26 WARN scheduler.TaskSetManager: Lost task 115.0 in stage 160.0 (TID 8635) (node56.cluster.tsc.uc3m.es executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 00:29:26 WARN scheduler.TaskSetManager: Lost task 112

Created table tls904 - Rows: 114690034
CPU times: user 394 ms, sys: 75.3 ms, total: 470 ms
Wall time: 6min 57s


                                                                                

In [69]:
patstat_appln = spark.read.parquet(dir_parquet.joinpath(f"patstat_appln.parquet").as_posix())
patstat_appln = patstat_appln.filter(patstat_appln.appln_abstract.isNotNull())
patstat_appln.count()

22/03/02 01:07:24 ERROR scheduler.TaskSchedulerImpl: Lost executor 11 on node89.cluster.tsc.uc3m.es: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 01:07:24 WARN scheduler.TaskSetManager: Lost task 138.0 in stage 179.0 (TID 9784) (node89.cluster.tsc.uc3m.es executor 11): ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 01:07:24 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 179.0 (TID 9780) (node89.cluster.tsc.uc3m.es executor 11): ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 01:07:24 WARN scheduler.TaskSetManager: Lost task

22/03/02 01:07:28 ERROR scheduler.TaskSchedulerImpl: Lost executor 3 on node73.cluster.tsc.uc3m.es: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 01:07:28 WARN scheduler.TaskSetManager: Lost task 89.1 in stage 179.0 (TID 9831) (node73.cluster.tsc.uc3m.es executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 01:07:28 WARN scheduler.TaskSetManager: Lost task 138.1 in stage 179.0 (TID 9834) (node73.cluster.tsc.uc3m.es executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
22/03/02 01:07:33 ERROR scheduler.TaskSchedulerImpl: Lost execu

72641357