In [1]:
# https://mmuratarat.github.io/2020-06-18/pyspark-postgresql-locally
# https://www.youtube.com/watch?v=78FWrtEVYeA
# https://opensource.com/article/18/11/pyspark-jupyter-notebook

# Configure local spark

In [2]:
os.environ["PYSPARK_PYTHON"]="jupyter"
os.environ["PYSPARK_DRIVER_PYTHON"]="jupyter"

In [3]:
from pyspark.sql import SparkSession
import sparknlp

In [4]:
spark = SparkSession.builder \
    .appName("capstone")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.0.1")\
    .config("spark.kryoserializer.buffer.max", "2000m")\
    .config("spark.jars", "postgresql-42.2.19.jar")\
    .getOrCreate()

In [5]:
print(sparknlp.version())
print(spark.version)

3.0.1
3.1.1


In [6]:
base_query = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:6021/eksi") \
    .option("user", "postgres") \
    .option("password", "docker") \
    .option("driver", "org.postgresql.Driver")

# Load tables

In [7]:
df_entries = base_query.option("dbtable", "entries").load()
df_entries.printSchema()

root
 |-- entry_id: integer (nullable = true)
 |-- date_updated: timestamp (nullable = true)
 |-- date_created: timestamp (nullable = true)
 |-- author_id: integer (nullable = true)
 |-- topic_id: integer (nullable = true)
 |-- content: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)



In [8]:
df_authors = base_query.option("dbtable", "authors").load()
df_authors.printSchema()

root
 |-- author_id: integer (nullable = true)
 |-- badges: string (nullable = true)
 |-- nick: string (nullable = true)
 |-- total_entry_count: integer (nullable = true)
 |-- last_entry_date: timestamp (nullable = true)
 |-- is_deactivated: boolean (nullable = true)
 |-- is_cursed: boolean (nullable = true)
 |-- follower_count: integer (nullable = true)
 |-- followings_count: integer (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- favorite_entries: string (nullable = true)
 |-- favorited_entries: string (nullable = true)



In [9]:
df_topics = base_query.option("dbtable", "topics").load()
df_topics.printSchema()

root
 |-- topic_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- entry_count: integer (nullable = true)



In [10]:
df_entry_dates =  base_query.option("dbtable", "entry_dates").load()
df_entry_dates.printSchema()

root
 |-- date_created: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- week: integer (nullable = true)



In [11]:
df_entry_update_dates = base_query.option("dbtable", "entry_update_dates").load()
df_entry_update_dates.printSchema()

root
 |-- date_updated: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- week: integer (nullable = true)



# Modify columns

In [12]:
df_entries = df_entries.drop("comment_count").drop_duplicates()

In [13]:
df_entries.printSchema()

root
 |-- entry_id: integer (nullable = true)
 |-- date_updated: timestamp (nullable = true)
 |-- date_created: timestamp (nullable = true)
 |-- author_id: integer (nullable = true)
 |-- topic_id: integer (nullable = true)
 |-- content: string (nullable = true)
 |-- favorite_count: integer (nullable = true)



In [14]:
df_authors = df_authors.drop('is_deactivated', 'is_cursed','picture_url') .drop_duplicates()

In [15]:
df_authors.printSchema()

root
 |-- author_id: integer (nullable = true)
 |-- badges: string (nullable = true)
 |-- nick: string (nullable = true)
 |-- total_entry_count: integer (nullable = true)
 |-- last_entry_date: timestamp (nullable = true)
 |-- follower_count: integer (nullable = true)
 |-- followings_count: integer (nullable = true)
 |-- favorite_entries: string (nullable = true)
 |-- favorited_entries: string (nullable = true)



# Badges explore

In [23]:
df_authors.createGlobalTempView("authors")

In [26]:
authors_with_badges = spark.sql("""
Select badges, author_id From global_temp.authors
Where length(badges) > 1
""")

In [35]:
authors_with_badges.show()

+---------------+---------+
|         badges|author_id|
+---------------+---------+
|         azimli|    24956|
|         azimli|    25113|
|         azimli|    32345|
|         azimli|    68363|
|         azimli|   210965|
|         merhum|   270348|
|         azimli|   491243|
|         azimli|   513363|
|         azimli|   526608|
|         azimli|   536725|
|         azimli|   603986|
|         azimli|   729738|
|azimli,tasnifçi|   759951|
|         azimli|   772773|
|         azimli|   772789|
|         azimli|   783288|
|         azimli|   784487|
|         azimli|   937849|
|         azimli|  1000198|
|         azimli|  1110869|
+---------------+---------+
only showing top 20 rows



# Missing authors

In [14]:
# df_authors.createGlobalTempView("authors")
df_entries.createGlobalTempView("entries")

In [15]:
authors_not_processed = spark.sql("""
Select distinct author_id From global_temp.entries Where author_id not in (
	Select entries.author_id From global_temp.entries
	Join global_temp.authors using(author_id)
)
""")

In [16]:
authors_not_processed.show()

+---------+
|author_id|
+---------+
|   425654|
+---------+



In [17]:
# TODO add these authors to parse queue

# NLP Test

In [25]:
import json
import pandas as pd
import numpy as np

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

spark = sparknlp.start()

In [26]:
MODEL_NAME = "onto_100"

In [27]:
text_list = [
    """William Henry Gates III (born October 28, 1955) is an American business magnate, software developer, investor, and philanthropist. He is best known as the co-founder of Microsoft Corporation. During his career at Microsoft, Gates held the positions of chairman, chief executive officer (CEO), president and chief software architect, while also being the largest individual shareholder until May 2014. He is one of the best-known entrepreneurs and pioneers of the microcomputer revolution of the 1970s and 1980s. Born and raised in Seattle, Washington, Gates co-founded Microsoft with childhood friend Paul Allen in 1975, in Albuquerque, New Mexico; it went on to become the world's largest personal computer software company. Gates led the company as chairman and CEO until stepping down as CEO in January 2000, but he remained chairman and became chief software architect. During the late 1990s, Gates had been criticized for his business tactics, which have been considered anti-competitive. This opinion has been upheld by numerous court rulings. In June 2006, Gates announced that he would be transitioning to a part-time role at Microsoft and full-time work at the Bill & Melinda Gates Foundation, the private charitable foundation that he and his wife, Melinda Gates, established in 2000.[9] He gradually transferred his duties to Ray Ozzie and Craig Mundie. He stepped down as chairman of Microsoft in February 2014 and assumed a new post as technology adviser to support the newly appointed CEO Satya Nadella.""",
    """The Mona Lisa is a 16th century oil painting created by Leonardo. It's held at the Louvre in Paris."""
]

In [28]:
documentAssembler = DocumentAssembler() \
    .setInputCol('text') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

# ner_dl and onto_100 model are trained with glove_100d, so the embeddings in
# the pipeline should match
if (MODEL_NAME == "ner_dl") or (MODEL_NAME == "onto_100"):
    embeddings = WordEmbeddingsModel.pretrained('glove_100d') \
        .setInputCols(["document", 'token']) \
        .setOutputCol("embeddings")

# Bert model uses Bert embeddings
elif MODEL_NAME == "ner_dl_bert":
    embeddings = BertEmbeddings.pretrained(name='bert_base_cased', lang='en') \
        .setInputCols(['document', 'token']) \
        .setOutputCol('embeddings')

ner_model = NerDLModel.pretrained(MODEL_NAME, 'en') \
    .setInputCols(['document', 'token', 'embeddings']) \
    .setOutputCol('ner')

ner_converter = NerConverter() \
    .setInputCols(['document', 'token', 'ner']) \
    .setOutputCol('ner_chunk')

nlp_pipeline = Pipeline(stages=[
    documentAssembler, 
    tokenizer,
    embeddings,
    ner_model,
    ner_converter
])

glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]
onto_100 download started this may take some time.
Approximate size to download 13.5 MB
[OK!]


In [29]:
empty_df = spark.createDataFrame([['']]).toDF('text')
pipeline_model = nlp_pipeline.fit(empty_df)
df = spark.createDataFrame(pd.DataFrame({'text': text_list}))
result = pipeline_model.transform(df)

In [None]:
result.collect()