In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
companies = spark.read.csv(
    "data/companies.csv", 
    header=True, inferSchema=True)

In [5]:
jobs = spark.read.csv(
    "data/alldata.csv", 
    header=True, inferSchema=True)

In [26]:
jobs.select('location').show(5, False)

+---------+
|location |
+---------+
|null     |
| GA.     |
|null     |
| database|
|null     |
+---------+
only showing top 5 rows



### 1: JOIN SPARK DATAFRAMES

In [None]:
# rename description column in companies to desc

In [17]:
joined = companies.join(jobs, companies["company name"] == jobs["company"])

In [18]:
joined.columns

['ticker',
 'company name',
 'short name',
 'industry',
 'description',
 'website',
 'logo',
 'ceo',
 'exchange',
 'market cap',
 'sector',
 'tag 1',
 'tag 2',
 'tag 3',
 'position',
 'company',
 'description',
 'reviews',
 'location']

### 2: SPARK NGRAM, TOKENIZER

In [11]:
from pyspark.ml.feature import NGram, Tokenizer

In [12]:
def ngram_fn(n, j):
    tokens = Tokenizer(inputCol='description', outputCol='tokens')
    jobs = tokens.transform(j)
    nn = NGram(n=n, inputCol='tokens', outputCol="ngrams")
    jobs = nn.transform(jobs)
    return jobs


In [13]:
jobs2 = ngram_fn(2, jobs)

In [15]:
jobs2.columns

['position',
 'company',
 'description',
 'reviews',
 'location',
 'tokens',
 'ngrams']

In [19]:
jobs3 = jobs2.filter(jobs2.description.isNotNull())

In [20]:
jobs3.select("tokens").show(2, False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tokens                                                                                                                                                                  |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[development, director]                                                                                                                                                 |
|[, generating, awareness, of, als, tdi;, outreach, including, attending, and, speaking, at, events, as, well, as, personally, cultivates, relationships, with, patients]|
+------------------------------------------------------------------------------------------------------------------------------------------------

In [21]:
jobs3.select('ngrams').show(2, False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ngrams                                                                                                                                                                                                                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[development director]                                                        

### 3 CITY NGRAM FREQUENCY

In [22]:
new_jobs = jobs3.select(
    'description','tokens', 'ngrams','location', F.split(jobs3['location'], ',')[0].alias('city'))


In [24]:
jobs3.select('location').show()

+--------------------+
|            location|
+--------------------+
|                null|
|                 GA.|
|            database|
| has served as on...|
|                null|
|                null|
| has an open posi...|
|                null|
| or other quantit...|
| Algorithms/Incor...|
|                null|
|                null|
| regulatory and p...|
|                null|
|                null|
|                null|
|                null|
|                null|
| impactful solutions|
|                null|
+--------------------+
only showing top 20 rows



In [33]:
new_jobs = new_jobs.filter(new_jobs.city.isNotNull())

In [34]:
# q = new_jobs.select(
#     'city',
#     F.explode('ngrams').alias('ngrams')).groupBy(['city', 'ngrams']).count()

In [44]:
def frequeciesCity(df):
    l = []
    df1 = df.select(
        'city',
        F.explode('ngrams').alias('ngrams')).groupBy(['city', 'ngrams']).count()
    return df1

In [45]:
city_freq = frequeciesCity(new_jobs)

In [46]:
city_freq.show(5)

+--------------------+----------------+-----+
|                city|          ngrams|count|
+--------------------+----------------+-----+
|             Atlanta|      the impact|    2|
|             Atlanta|analytical tools|    1|
|             back-up|and programming;|    1|
| guidance and ins...|  advisable. may|    1|
|                etc.|  missing values|    1|
+--------------------+----------------+-----+
only showing top 5 rows



In [52]:
city_freq = city_freq.withColumnRenamed('count', 'counts')
# city_freq = city_freq.orderBy(q.counts.desc())

### 4 INDUSTRY NGRAM FREQUENCY

In [48]:
def frequeciesIndustry(df):
    l = []
    df1 = df.select(
        'industry',
        F.explode('ngrams').alias('ngrams')).groupBy(['industry', 'ngrams']).count()
    return df1

In [None]:
indus_freq = frequeciesIndustry(joined)