# Usecase 1: Sentiment Analysis of L&T ECC Vendors

## Dealing with Semi-Structured Data stored in HDFS of SQL 2019

-   In this notebook we will see how to process, transform, prepare JSON file data for model scoring and score each news items for sentiment labels based on external REST API requests.
-   The model end point as REST API is developed outside of SQL 2019 BDC and hosted in Azure for batch and live model scores.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("Spark_Ingestion_Job")\
        .config("spark.executor.memory", "20g")\
        .config("spark.executor.instances", "3")\
        .config("spark.master", "yarn")\
        .config("spark.submit.deployMode", "client")\
        .config("spark.driver.memory", "30g")\
        .enableHiveSupport()\
        .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
36,application_1601979426726_0043,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

###  Loading Data

In [None]:
# load json into spark RDD 
news_data_rdd = sc.textFile('/COE/news_data/contify_insights_new.json')
news_data_df = spark.read.option("multiline", "true").json(news_data_rdd)
news_data_df.show(2)
print(type(news_data_df))
print(news_data_df.dtypes)

In [47]:
# do with RDD
import json
news_data_rdd = sc.textFile('/COE/news_data/contify_insights_new.json').map(json.loads)
news_data_rdd.take(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{'results': [{'source': {'id': '1829', 'name': 'Domain-b', 'rank': 789288}, 'previews': [], 'url': 'https://www.domain-b.com/companies/companies_s/Siemens/20200824_acquisition.html', 'attachments': [], 'duplicates': [], 'content_types': [{'id': 3, 'name': 'News Articles'}], 'language': {'id': 'en', 'name': 'English'}, 'channel': 'News and Other Websites', 'summary': 'Siemens gets CCI nod for proposed acquisition of C&S Electric The Competition Commission of India (CCI) has approved the proposed acquisition of C&S Electric Limited by Siemens Limited. The combination envisages acquisition of 100 per cent acquisition of the share capital of C&S Electric Limited by Siemens India. At the time of closing of the proposed combination, the scope of business of C&S shall include low-voltage (LV) switchgear components and panels, LV and medium voltage (MV) power busbars as well as protection and metering devices of C&S. Certain other businesses of C&S, such as MV switchgear and package sub-stati

In [84]:
from pyspark.sql.types import Row 

def spliter(lines):
    data = {}
    line = lines['results']
    for d in line:
        data['id'] = d['id']
        data['title'] = d['title']
        data['summary'] = d['summary']
    data['search_company'] = lines['search_company']
    return data

rdd_df = news_data_rdd.map(lambda x: Row(**spliter(x))).toDF()
rdd_df.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+--------------------+--------------------+
|            id|      search_company|             summary|               title|
+--------------+--------------------+--------------------+--------------------+
|20082425521615|C&S ELECTRIC LIMI...|Siemens gets CCI ...|Siemens gets CCI ...|
+--------------+--------------------+--------------------+--------------------+
only showing top 1 row

In [78]:
rdd_df.write.format('csv').mode('overwrite').option('header', True).save('/COE/news_data/rdd_to_df_sample.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling o1726.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOpera

### Data Pre-processing

In [4]:
# import libraries 
import json
import requests
import re
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import explode, col, array_contains, udf, lit, encode, regexp_replace
import pyspark.sql.functions as sf
import pyspark.sql.types as T
from pyspark.sql.functions import concat, first
from pyspark.sql.types import StructType, StructField, StringType, MapType, FloatType, ArrayType, DoubleType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# view schema 
print(news_data_df.dtypes)
print(news_data_df.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('count', 'bigint'), ('next', 'string'), ('previous', 'string'), ('results', 'array<struct<attachments:array<string>,channel:string,companies:array<struct<id:bigint,logo:string,name:string,url:string>>,content_types:array<struct<id:bigint,name:string>>,custom_topics:array<string>,duplicates:array<string>,id:bigint,image_url:string,industries:array<struct<id:bigint,name:string>>,language:struct<id:string,name:string>,locations:array<struct<id:bigint,name:string>>,previews:array<string>,pub_date:string,source:struct<id:string,name:string,rank:bigint>,summary:string,title:string,topics:array<struct<id:bigint,name:string>>,url:string>>'), ('search_company', 'string')]
root
 |-- count: long (nullable = true)
 |-- next: string (nullable = true)
 |-- previous: string (nullable = true)
 |-- results: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachments: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |--

In [78]:
# # example func
# def extract_from_list(list_input):
#     # lt = [item for l in list_input for item in l]
#     # return(lt)
#     for item in list_input:
#         return(item.items())

# # create UDF
# from pyspark.sql.functions import udf 
# from pyspark.sql.types import ArrayType 
# udf_list_func = udf(lambda x: extract_from_list(x), ArrayType(StringType()))

# # test function with example
# extract_from_list([{'id': 627027, 'url': 'https: //cselectric.co.in/', 'logo': '//112233.contify.com/https://112233.contify.com/images/watchlist/company-ico.png?v=3', 'name': 'C&S Electric Limited'},
# 						  {'id': 1390, 'url': 'https: //new.siemens.com', 'logo': '//112233.contify.com/images/company/logo/siemens-1-29475.png', 'name': 'Siemens AG'}])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dict_items([('id', 627027), ('url', 'https: //cselectric.co.in/'), ('logo', '//112233.contify.com/https://112233.contify.com/images/watchlist/company-ico.png?v=3'), ('name', 'C&S Electric Limited')])

In [6]:
# select records of interest 
df_new = news_data_df.select(col('results.id'), col('results.title'), col('results.url'), col('results.summary'), col('search_company'))
df_new = df_new.filter(sf.size('url') > 0)
df_new.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------------+--------------------+--------------------+--------------------+
|              id|               title|                 url|             summary|      search_company|
+----------------+--------------------+--------------------+--------------------+--------------------+
|[20082425521615]|[Siemens gets CCI...|[https://www.doma...|[Siemens gets CCI...|C&S ELECTRIC LIMI...|
|[20082425521616]|    [ESSAR FORGINGS]|                  []|[Manufacturers & ...|      ESSAR FORGINGS|
+----------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows

In [7]:
# prepare input data for model scoring 
# encode text input
text_udf = udf(lambda x: "".join(x))
id_udf = udf(lambda x: x[0])

df_new = df_new.withColumn("summary", text_udf(col("summary")))
df_new = df_new.withColumn("id", id_udf(col("id")))
df_new = df_new.withColumn("title", id_udf(col("title")))
df_new = df_new.withColumn("url", id_udf(col("url")))
# df_new = df_new.withColumn('summary_new', regex_udf(col('summary')))
df_new = df_new.withColumn("summary_new", regexp_replace("summary", '(\“)|(\”)', ''))
df_new.show(2)

# encode text input
encode_udf = udf(lambda x: x.encode('utf-8').decode('latin-1'), StringType())
df_new = df_new.withColumn("summary_new", encode_udf(col("summary_new")))
df_new.show(2)

#construct text input for model scoring 
df_new_model = df_new.withColumn('model_input_text', concat(sf.lit('{"news":"'), col('summary_new'), sf.lit('",'), sf.lit('"name":"'), col('search_company'), sf.lit('"}')))
df_new_model.show(2)

# view full details of a column
df_new_model.select('model_input_text').show(2)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            id|               title|                 url|             summary|      search_company|         summary_new|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|20082425521615|Siemens gets CCI ...|https://www.domai...|Siemens gets CCI ...|C&S ELECTRIC LIMI...|Siemens gets CCI ...|
|20082425521616|      ESSAR FORGINGS|                null|Manufacturers & S...|      ESSAR FORGINGS|Manufacturers & S...|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            id|               title|                 url|             summary|      search_company|         summary_new|

###  Model Scoring

In [8]:
# sentiment analysis model scoring via Rest API
#

def sentiment_scores(text_input):
    response = requests.post("http://52.187.124.32:80/api/v1/service/absa-sentiment-predictor-v2/score", text_input, headers = {'Content-Type' : 'application/json', 'Authorization': 'Bearer 1Q7d5p2SqViNlQbhe6gtHBAiZ5MB58rU'})
    response = response.json()
    polarity = response['_doc_polarity']
    scores = response['scores']
    return(polarity, scores)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# construct udf 
#

# define schema 
#
# schema = StructType([StructField("polarity", StringType(), False), StructField("scores", MapType(StringType(), FloatType()), False)])
# print(schema)

# schema = StructType([(StructField("polarity", StringType(), False), 
#                      StructField('pos_score', DoubleType(), False), 
#                      StructField('neg_score', DoubleType(), False), 
#                      StructField('neu_score', DoubleType(), False), True)])
# print(schema)

schema = StructType([StructField("polarity", StringType(), False), StructField("scores", StructType([StructField("Positive", DoubleType(), True), StructField("Negative", DoubleType(), True), StructField("Neutral", DoubleType(), True)]), True)])
print(schema)

# define the UDF 
ss_udf = sf.udf(sentiment_scores, schema)

# call the udf 
# df_new_model = df_new_model.withColumn('overall_sentiment', ss_udf(df_new_model.model_input_text))
# df_new_model.show(3)

# call the udf with select (an alternative to withColumn)
preds_df = df_new_model.select(ss_udf('model_input_text').alias('overall_sentiment'))
preds_df.show(3)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

StructType(List(StructField(polarity,StringType,false),StructField(scores,StructType(List(StructField(Positive,DoubleType,true),StructField(Negative,DoubleType,true),StructField(Neutral,DoubleType,true))),true)))
+--------------------+
|   overall_sentiment|
+--------------------+
|[Positive, [0.625...|
|[Negative, [0.0, ...|
|[Negative, [0.0, ...|
+--------------------+
only showing top 3 rows

In [37]:
# select rows
#

preds_df.printSchema()
preds_df.show(3)
preds_df.take(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- overall_sentiment: struct (nullable = true)
 |    |-- polarity: string (nullable = false)
 |    |-- scores: struct (nullable = true)
 |    |    |-- Positive: double (nullable = true)
 |    |    |-- Negative: double (nullable = true)
 |    |    |-- Neutral: double (nullable = true)

+--------------------+
|   overall_sentiment|
+--------------------+
|[Positive, [0.625...|
|[Negative, [0.0, ...|
|[Negative, [0.0, ...|
+--------------------+
only showing top 3 rows

[Row(overall_sentiment=Row(polarity='Positive', scores=Row(Positive=0.625, Negative=0.0, Neutral=0.094))), Row(overall_sentiment=Row(polarity='Negative', scores=Row(Positive=0.0, Negative=1.5, Neutral=0.0))), Row(overall_sentiment=Row(polarity='Negative', scores=Row(Positive=0.0, Negative=0.5, Neutral=0.167)))]

In [20]:
df_new_model.select('polarity', 'scores').show(2)
# df_new_model_1 = df_new_model.select('id', 'title', 'url', 'summary', 'search_company', 'polarity', explode('scores').alias(['positive', 'negative', 'neutral']))
# df_new_model_1.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------------------+
|polarity|             scores|
+--------+-------------------+
|Positive|[0.625, 0.0, 0.094]|
|Negative|    [0.0, 1.5, 0.0]|
+--------+-------------------+
only showing top 2 rows

In [22]:
# df_new_model_final = df_new_model.select('id', 'title', 'url', 'summary', 'search_company', 'polarity', 'scores.*')
df_new_model_final = df_new_model.select('id', 'title', 'url', 'summary', 'search_company', 'polarity')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df_new_model_final.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+--------------------+--------------------+--------------------+--------+
|            id|               title|                 url|             summary|      search_company|polarity|
+--------------+--------------------+--------------------+--------------------+--------------------+--------+
|20082425521615|Siemens gets CCI ...|https://www.domai...|Siemens gets CCI ...|C&S ELECTRIC LIMI...|Positive|
|20082425521616|      ESSAR FORGINGS|                null|Manufacturers & S...|      ESSAR FORGINGS|Negative|
|20082425521621|Kazikhan Engineer...|                null|Kazikhan Engineer...|KAZIKHAN ENGINEER...|Negative|
+--------------+--------------------+--------------------+--------------------+--------------------+--------+
only showing top 3 rows

In [29]:
print(df_new_model_final.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- search_company: string (nullable = true)
 |-- polarity: string (nullable = true)

None

Save Data Frame into HDFS as RDD

In [None]:
df_new_model_final_rdd = df_new_model_final.rdd
# df_new_model_final_rdd.saveAsTextFile('/COE/news_data/news_rdd/')

###  Save Data Frame into Hive Table

In [59]:
#df_new_model_final = df_new_model.take(3)
df_new_model.createOrReplaceTempView('df_new_model_temp')
df_new_model_final = spark.sql('select polarity from df_new_model_temp')
df_new_model_final.show()
# df_new_model_final.write.save('/COE/news_data/sentiment_scores.json', header = True, mode = 'overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|polarity|
+--------+
|Positive|
|Negative|
|Negative|
|Positive|
|Positive|
|Positive|
|Negative|
+--------+

In [51]:
df_new_model_final.take()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
'list' object has no attribute 'take'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'take'



In [None]:
# save dataframe to hdfs
#

df_new_model_final.write.format('csv').mode('overwrite').option('header', True).save('/COE/news_data/sentiment_model_scores.csv')