In [1]:
# importing required modules
import boto3

# defining s3
s3 = boto3.resource('s3')

# setting region to the nearest
region = "eu-west-1"

# initialized s3 resource
s3_client = boto3.client('s3', region_name=region)

# downloading the file and saving it in preferred destination
s3.Bucket('blossom-data-engs').download_file('alldata.csv', '/home/eddie/Desktop/Blossom_Eddie/alldata.csv')
s3.Bucket('blossom-data-engs').download_file('companies.csv', '/home/eddie/Desktop/Blossom_Eddie/companies.csv')

In [2]:
#loading the relevant packages
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Tokenizer

# create spark session if one doesn't exist already 
spark = SparkSession.builder.getOrCreate()

In [3]:
companies = spark.read.csv(
    'companies.csv', 
     header=True , inferSchema=True, escape='"', multiLine=True)

alldata = spark.read.csv(
    'alldata.csv', 
     header=True , inferSchema=True, escape='"', multiLine=True)

In [4]:
#joining companies & alldata datasets on 2 columns
new = companies.join(alldata, (companies['company name'] == alldata.company) | (companies.description == alldata.description))

In [5]:
new.columns

['ticker',
 'company name',
 'short name',
 'industry',
 'description',
 'website',
 'logo',
 'ceo',
 'exchange',
 'market cap',
 'sector',
 'tag 1',
 'tag 2',
 'tag 3',
 'position',
 'company',
 'description',
 'reviews',
 'location']

QTNS D TO E

In [6]:
#this function tokenizes and generates unigrams and bigrams from the 'description' column

def generate_ngram():
    global companies
    companies.filter(companies.description.isNotNull())
    companies.filter(companies.industry.isNotNull())
    tokens = Tokenizer(inputCol= 'description', outputCol= 'tokens')
    companies = tokens.transform(companies)
    bigrams = NGram(n=2, inputCol='tokens', outputCol='bigrams')
    unigrams = NGram(n=1, inputCol='tokens', outputCol='unigrams')
    companies = bigrams.transform(companies)
    companies = unigrams.transform(companies)
    return companies.select(['bigrams', 'unigrams']).show(5)

In [7]:
generate_ngram()

+--------------------+--------------------+
|             bigrams|            unigrams|
+--------------------+--------------------+
|[agilent technolo...|[agilent, technol...|
|[alcoa corp, corp...|[alcoa, corp, is,...|
|[altaba inc, inc ...|[altaba, inc, is,...|
|[aac holdings, ho...|[aac, holdings, i...|
|[the investment, ...|[the, investment,...|
+--------------------+--------------------+
only showing top 5 rows



In [8]:
#this functions creates a dataframe with 3 columns (bigrams, industry & frequency)

def dataframe_bigrams():
    global companies
    companies_2 = companies.select('bigrams', 'industry').limit(1).select('industry', F.explode('bigrams').alias('bigrams')).groupBy('bigrams', 'industry').count()
    companies_2 = companies_2.withColumnRenamed('count', 'frequency')
    companies_2 = companies_2.orderBy("frequency", ascending = True)
    return companies_2.show()
dataframe_bigrams()

+--------------------+--------------------+---------+
|             bigrams|            industry|frequency|
+--------------------+--------------------+---------+
|agilent technologies|Medical Diagnosti...|        1|
|    technologies inc|Medical Diagnosti...|        1|
|              inc is|Medical Diagnosti...|        1|
|          is engaged|Medical Diagnosti...|        1|
|          engaged in|Medical Diagnosti...|        1|
|             in life|Medical Diagnosti...|        1|
|      life sciences,|Medical Diagnosti...|        1|
|sciences, diagnos...|Medical Diagnosti...|        1|
|     diagnostics and|Medical Diagnosti...|        1|
|         and applied|Medical Diagnosti...|        1|
|    applied chemical|Medical Diagnosti...|        1|
|   chemical markets.|Medical Diagnosti...|        1|
|        markets. the|Medical Diagnosti...|        1|
|         the company|Medical Diagnosti...|        1|
|    company provides|Medical Diagnosti...|        1|
|provides application|Medica

In [10]:
#this functions creates a dataframe with 3 columns (unigrams, industry & frequency)

def dataframe_unigrams():
    global companies
    companies_2 = companies.select('unigrams', 'industry').limit(1).select('industry', F.explode('unigrams').alias('unigrams')).groupBy('unigrams', 'industry').count()
    companies_2 = companies_2.withColumnRenamed('count', 'frequency')
    companies_2 = companies_2.orderBy("frequency", ascending = True)
    return companies_2.show()
dataframe_unigrams()

+------------+--------------------+---------+
|    unigrams|            industry|frequency|
+------------+--------------------+---------+
|     agilent|Medical Diagnosti...|        1|
|technologies|Medical Diagnosti...|        1|
|         inc|Medical Diagnosti...|        1|
|          is|Medical Diagnosti...|        1|
|     engaged|Medical Diagnosti...|        1|
|          in|Medical Diagnosti...|        1|
|        life|Medical Diagnosti...|        1|
|   sciences,|Medical Diagnosti...|        1|
| diagnostics|Medical Diagnosti...|        1|
|     applied|Medical Diagnosti...|        1|
|    chemical|Medical Diagnosti...|        1|
|    markets.|Medical Diagnosti...|        1|
|     company|Medical Diagnosti...|        1|
|    provides|Medical Diagnosti...|        1|
| application|Medical Diagnosti...|        1|
|     focused|Medical Diagnosti...|        1|
|   solutions|Medical Diagnosti...|        1|
|        that|Medical Diagnosti...|        1|
|     include|Medical Diagnosti...

In [11]:
#this function tokenizes and generates unigrams and bigrams from the 'description' column from the alldata set

def genall():
    global alldata
    alldata.filter(alldata.description.isNotNull())
    alldata.filter(alldata.location.isNotNull())
    tokens = Tokenizer(inputCol= 'description', outputCol= 'tokens')
    alldata = tokens.transform(alldata)
    bigrams = NGram(n=2, inputCol='tokens', outputCol='bigrams')
    unigrams = NGram(n=1, inputCol='tokens', outputCol='unigrams')
    alldata = bigrams.transform(alldata)
    alldata = unigrams.transform(alldata)
    return alldata.select(['bigrams', 'unigrams']).show(5)
genall()

+--------------------+--------------------+
|             bigrams|            unigrams|
+--------------------+--------------------+
|[development dire...|[development, dir...|
|[job description,...|[job, description...|
|[growing company,...|[growing, company...|
|[department: prog...|[department:, pro...|
|[description the,...|[description, the...|
+--------------------+--------------------+
only showing top 5 rows



In [12]:
#this function creates dataframe (unigrams, city and frequency)
def genall_unigrams():
    global alldata
    alldata1 = alldata.withColumn('city', F.split(alldata['location'], ',')[0])
    alldata1 = alldata1.select(['unigrams', 'city']).limit(1).select('city', F.explode('unigrams').alias('unigrams')).groupBy(['unigrams', 'city']).count().orderBy("count", ascending = False)
    alldata1 = alldata1.withColumnRenamed('count', 'frequency')
    return alldata1.show()

In [13]:
genall_unigrams()

+-----------+-------+---------+
|   unigrams|   city|frequency|
+-----------+-------+---------+
|        and|Atlanta|       20|
|        the|Atlanta|        9|
|        als|Atlanta|        9|
|         to|Atlanta|        8|
|development|Atlanta|        7|
|         in|Atlanta|        6|
|         at|Atlanta|        5|
|         as|Atlanta|        5|
|         of|Atlanta|        4|
|        for|Atlanta|        4|
|          a|Atlanta|        4|
|  including|Atlanta|        4|
|       with|Atlanta|        4|
|fundraising|Atlanta|        3|
|  institute|Atlanta|        3|
|   position|Atlanta|        3|
|         is|Atlanta|        3|
|    therapy|Atlanta|        3|
|           |Atlanta|        3|
|       this|Atlanta|        3|
+-----------+-------+---------+
only showing top 20 rows



In [14]:
#this function creates dataframe (bigrams, city and frequency)

def genall_bigrams():
    global alldata
    alldata1 = alldata.withColumn('city', F.split(alldata['location'], ',')[0])
    alldata1 = alldata1.select(['bigrams', 'city']).limit(1).select('city', F.explode('bigrams').alias('bigrams')).groupBy(['bigrams', 'city']).count().orderBy("count", ascending = False)
    alldata1 = alldata1.withColumnRenamed('count', 'frequency')
    return alldata1.show()
genall_bigrams()

+--------------------+-------+---------+
|             bigrams|   city|frequency|
+--------------------+-------+---------+
|development insti...|Atlanta|        3|
| therapy development|Atlanta|        3|
|         als therapy|Atlanta|        3|
|              tdi is|Atlanta|        2|
|development director|Atlanta|        2|
|             will be|Atlanta|        2|
|       prospects and|Atlanta|        2|
|        patients and|Atlanta|        2|
|             als tdi|Atlanta|        2|
|       position will|Atlanta|        2|
|      in fundraising|Atlanta|        2|
|          ability to|Atlanta|        2|
| institute (als.net)|Atlanta|        2|
|           and their|Atlanta|        2|
|             well as|Atlanta|        2|
|             as well|Atlanta|        2|
|       this position|Atlanta|        2|
|        director als|Atlanta|        1|
|       institute has|Atlanta|        1|
|              has an|Atlanta|        1|
+--------------------+-------+---------+
only showing top