In [None]:
# !pip install kafka-python

In [1]:
import pyspark
from pyspark.sql import SparkSession
import os
import json

def transform_json_files(directory_path):
    
    result = []
    # traverse directory
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)
        
        # if path leads to file
        if os.path.isfile(file_path):
            # open file and read as json
            with open(file_path, "r") as file:
                file_json = json.load(file)
            # loop through json dictionary entries
            for key, value in file_json.items():
                value['WORD'] = key
                value['PARTS_OF_SPEECH'] = [meaning[0] for meaning in value['MEANINGS'].values() if len(meaning) > 0]
                del value['MEANINGS']
                result.append(value)
    with open(f"{directory_path}/spark/all_words.json", "w") as f:
        json.dump(result, f)

transform_json_files("dictionary_script/data/Dictionary JSON")

spark = SparkSession.builder.getOrCreate()
df = spark.read.json("dictionary_script/data/Dictionary JSON/spark/all_words.json")
df.printSchema()

root
 |-- ANTONYMS: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- PARTS_OF_SPEECH: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- SYNONYMS: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- WORD: string (nullable = true)



In [2]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType

df.createOrReplaceTempView("tableA")

def get_number_of_adjectives(list_of_parts):
    return len([part for part in list_of_parts if part == "Adjective"])

spark.udf.register("numberOfAdjectives", get_number_of_adjectives, IntegerType())

result = spark.sql("""
select word, 
    parts_of_speech, 
    numberOfAdjectives(parts_of_speech) as adjective_meanings 
        from tableA
""")

result.show(5)

# How many adjectives are there or rather how many words 
# have at least one context (meaning) where they are adjectives
adjectives_count = result.filter(result.adjective_meanings > 0).count()
all_words_count = result.count()
print(f"There are {adjectives_count} words with at least 1 adjectival meaning out of {all_words_count} words.")

+----+---------------+------------------+
|word|parts_of_speech|adjective_meanings|
+----+---------------+------------------+
|   A|   [Noun, Noun]|                 0|
|A.D.|             []|                 0|
|A.M.|             []|                 0|
|  AA|         [Noun]|                 0|
| AAA|             []|                 0|
+----+---------------+------------------+
only showing top 5 rows

There are 12801 words with at least 1 adjectival meaning out of 121340 words.


In [1]:
def count_and_send_back(key, value):
    import ast
    from pyspark import SparkContext
    from Kafka_Helpers import Producer
    import json
    
    value = value.replace("\'", "\"")
    o = json.loads(value)
    
    # Create Spark context
    sc = SparkContext.getOrCreate()
    
    # make list of lists out of string
    l = ast.literal_eval(str(o['reviews']))
    
    # flatten list of lists to one single list
    l = [item for sublist in l for item in sublist]
    
    # create Spark RDD from flattened list
    wordsRDD = sc.parallelize(l, 4)
    
    # count
    res = wordsRDD.map(lambda w: (w, 1)).reduceByKey(lambda x,y: x+y).collect()
    
    # send result (unique words + count) back to another Kafka topic
    p = Producer(server='kafka', port=9092)
    o['counted_words'] = res
    p.send('adjectives_counted', 'mykey', json.dumps(o))
    
from Kafka_Helpers import Consumer, Producer

Consumer(server='kafka', port=9092, topic_name='adjectives', handler=count_and_send_back)
# Consumer(server='kafka', port=9092, topic_name='topic_d', handler=print_key_value)
    

Waiting for new events...


<Kafka_Helpers.Consumer at 0x7f9eec04f7c0>