In [2]:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import NGram


In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
companies = spark.read.csv(
            "companies.csv", 
            header=True, inferSchema=True)
alldat = spark.read.csv('alldata.csv',
                       header=True, inferSchema=True)

In [5]:
companies.count()


7310

In [6]:
companies.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+-----------+------------------+------------------+--------------------+--------------------+
|              ticker|        company name|          short name|            industry|         description|             website|     logo|                 ceo|            exchange| market cap|            sector|             tag 1|               tag 2|               tag 3|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+-----------+------------------+------------------+--------------------+--------------------+
|                   A|Agilent Technolog...|             Agilent|Medical Diagnosti...|Agilent Technolog...|http://www.agilen...|    A.png| Michael R. McMullen|New York Stock Ex...|24218

In [7]:
alldat.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|            position|             company|         description|             reviews|            location|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Development Director|             ALS TDI|Development Director|                null|                null|
|ALS Therapy Devel...| the Development ...| generating aware...| prospects and do...|                 GA.|
|       Requirements:|                null|                null|                null|                null|
|Bachelor's Degree...| written and pres...| as well as the a...|         spreadsheet|            database|
|About ALS Therapy...|                null|                null|                null|                null|
|The ALS Therapy D...| the charity unde...|  based in Cambridge|                  MA| has served as on...|
|            To Apply|               

In [8]:
companies.columns

['ticker',
 'company name',
 'short name',
 'industry',
 'description',
 'website',
 'logo',
 'ceo',
 'exchange',
 'market cap',
 'sector',
 'tag 1',
 'tag 2',
 'tag 3']

In [9]:
alldat.columns

['position', 'company', 'description', 'reviews', 'location']

In [10]:
all_companies = companies.join(alldat,companies['company name'] == alldat['company'], 'inner')

In [11]:
all_companies.show(5)

+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+---------+---------+---------+----------+------+-----+-----+-----+--------------------+---------------+--------------------+-------------------+--------------------+
|              ticker|   company name|          short name|            industry|         description|             website|     logo|      ceo| exchange|market cap|sector|tag 1|tag 2|tag 3|            position|        company|         description|            reviews|            location|
+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+---------+---------+---------+----------+------+-----+-----+-----+--------------------+---------------+--------------------+-------------------+--------------------+
| The fund general...|         design| production or di...|http://www.invesc...|             PSJ.png|                null|NYSE Arca|3002

In [12]:
def ngram_gen (dataset,column_name):
    tokenizer = Tokenizer(inputCol = column_name, outputCol = 'token')
    ngram = NGram(n = 2, inputCol = 'token', outputCol = 'ngram')
    unigram = tokenizer.transform(dataset)
    bigram = ngram.transform(unigram)
    return unigram,bigram
    

In [13]:
null=alldat.select('location').dropna()
dropped=null.select(F.split('location', ' ')[0].alias('city'))
dropped_null = dropped[dropped.city.contains(',')]
dropped_null.show(5)


+--------+
|    city|
+--------+
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
|Atlanta,|
+--------+
only showing top 5 rows



In [14]:
dropped_null.createOrReplaceTempView('city')
city_count = spark.sql("SELECT city, COUNT(city) AS frequency FROM city GROUP BY city ORDER BY frequency ")
city_count.show(15)


+-----------+---------+
|       city|frequency|
+-----------+---------+
|Parsippany,|        1|
|  Berkeley,|        1|
|  Lynbrook,|        1|
| Allendale,|        1|
|Burlingame,|        1|
|  Martinez,|        1|
| Fairfield,|        1|
|     Union,|        1|
| Manhasset,|        1|
|  Richmond,|        2|
|   Hayward,|        2|
|Emeryville,|        2|
| Manhattan,|        2|
|   Belmont,|        2|
|  Brooklyn,|        3|
+-----------+---------+
only showing top 15 rows



In [15]:
companies.createOrReplaceTempView('industry')
industry_count = spark.sql("SELECT industry, COUNT(industry) AS frequency FROM industry GROUP BY industry ORDER BY frequency ")
industry_count.show(10)

+--------------------+---------+
|            industry|frequency|
+--------------------+---------+
|                null|        0|
| they provide inv...|        1|
| of the S&P 500 I...|        1|
|             DHS.png|        1|
|              grains|        1|
| of its total ass...|        1|
| such as strong R...|        1|
| such as converti...|        1|
|             PIZ.png|        1|
| and small U.S. c...|        1|
+--------------------+---------+
only showing top 10 rows



In [16]:
def sparkset ():
    city_freq_ngram = ngram_gen(city_count, 'city')[1]
    companies_ngram = ngram_gen(industry_count, 'industry')[1]
    return city_freq_ngram , companies_ngram

In [17]:
sparkset()[0].show()

+-----------+---------+-------------+-----+
|       city|frequency|        token|ngram|
+-----------+---------+-------------+-----+
| Manhasset,|        1| [manhasset,]|   []|
|     Union,|        1|     [union,]|   []|
|  Lynbrook,|        1|  [lynbrook,]|   []|
|  Berkeley,|        1|  [berkeley,]|   []|
|  Martinez,|        1|  [martinez,]|   []|
| Allendale,|        1| [allendale,]|   []|
| Fairfield,|        1| [fairfield,]|   []|
|Burlingame,|        1|[burlingame,]|   []|
|Parsippany,|        1|[parsippany,]|   []|
|  Richmond,|        2|  [richmond,]|   []|
|Emeryville,|        2|[emeryville,]|   []|
|   Hayward,|        2|   [hayward,]|   []|
|   Belmont,|        2|   [belmont,]|   []|
| Manhattan,|        2| [manhattan,]|   []|
|  Brooklyn,|        3|  [brooklyn,]|   []|
|   Alameda,|        4|   [alameda,]|   []|
|   Boulder,|        8|   [boulder,]|   []|
|   Oakland,|        9|   [oakland,]|   []|
|   Redmond,|       17|   [redmond,]|   []|
| Sunnyvale,|       20| [sunnyva