AYISHETU SEIDU

BLOSSOM ACADEMY

DATA ENGINEERING - SPRING 2020

DATA TRANSFORMATION WITH PYSPARK

In [21]:
import boto3
import pandas as pd
s3 = boto3.client('s3', region_name="eu-west-1")

from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql.functions import explode
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.ml.feature import NGram,Tokenizer

# create spark session if one doesn't exist already 
spark = SparkSession.builder.getOrCreate()

In [22]:
#download companies and job postings file from AWS S3 bucket
s3.download_file("blossom-data-engs", "companies.csv", 'companies.csv')
s3.download_file("blossom-data-engs", "alldata.csv", 'alldata.csv')

In [23]:
#import the companies file as comp and inspect columns
comp = spark.read.csv("companies.csv", header=True, inferSchema=True, multiLine=True, escape='"')
comp.columns

['ticker',
 'company name',
 'short name',
 'industry',
 'description',
 'website',
 'logo',
 'ceo',
 'exchange',
 'market cap',
 'sector',
 'tag 1',
 'tag 2',
 'tag 3\r']

In [4]:
# rename inpappropriately named columns
comp = comp.withColumnRenamed('company name','company_name')
comp = comp.withColumnRenamed('short name','short_name')
comp = comp.withColumnRenamed('market cap','market_cap')
comp = comp.withColumnRenamed('tag 1','tag1')
comp = comp.withColumnRenamed('tag 2','tag2')
comp = comp.withColumnRenamed('tag 3\r','tag3')

#convert the compnay name to lower case to facilitate easy comparison with other datasets
comp = comp.withColumn("company_name",F.lower(F.col("company_name")))

In [5]:
#read the job postings "alldata.csv" and inspect columns
jobs = spark.read.csv("alldata.csv", header=True, inferSchema=True, multiLine=True, escape='"')
jobs.columns

['position', 'company', 'description', 'reviews', 'location\r']

In [6]:
# rename inpappropriately named columns and columns common in the two data sets to prevent ambiguity in query
jobs = jobs.withColumnRenamed('description','job_description')
jobs = jobs.withColumnRenamed('location\r','job_location')

jobs.columns

['position', 'company', 'job_description', 'reviews', 'job_location']

In [7]:
#extract city from the location column, delimited by comma and name the resulting column'city'
city = F.split(jobs['job_location'], ',')[0].alias('city')

#add the resulting column from the split to the jobs dataframe
jobs = jobs.withColumn('city', city)

In [9]:
#convert the company name and job description columns to lower case for easy analysis
jobs = jobs.withColumn("company",F.lower(F.col("company")))

In [12]:
#join jobs with companies using the company names and assign the resulting dataset to a new variable
company_jobs = jobs.join(comp, jobs.company==comp.company_name)

#inspect resulting dataset from the join
company_jobs.columns

['position',
 'company',
 'job_description',
 'reviews',
 'job_location',
 'city',
 'ticker',
 'company_name',
 'short_name',
 'industry',
 'description',
 'website',
 'logo',
 'ceo',
 'exchange',
 'market_cap',
 'sector',
 'tag1',
 'tag2',
 'tag3']

In [13]:
#convert the job_description column to lower case
company_jobs = company_jobs.withColumn("job_description",F.lower(F.col("job_description")))

In [14]:
#a function to return n grams given dataset, column to convert and the number of n in the ngrams
def create_ngrams(dataset, column, num_of_n):
    from pyspark.ml.feature import NGram,Tokenizer
    #tokenise the column values
    tokens = Tokenizer(inputCol=column, outputCol='tokens')
    tokens = tokens.transform(dataset)
    #transform the tokens into ngrams and retrun the resulting output as a dataset
    ng_data = NGram(n=num_of_n, inputCol='tokens', outputCol='ngrams')
    ng_data = ng_data.transform(tokens)
    return ng_data

In [15]:
#test ngram function
create_ngrams(company_jobs, 'job_description',2)

DataFrame[position: string, company: string, job_description: string, reviews: int, job_location: string, city: string, ticker: string, company_name: string, short_name: string, industry: string, description: string, website: string, logo: string, ceo: string, exchange: string, market_cap: bigint, sector: string, tag1: string, tag2: string, tag3: string, tokens: array<string>, ngrams: array<string>]

In [16]:
#define a funtion that returns a 3 column data frame of ngrams, column_togroup_by and count of ngrams
def create_2column_data(dataset, ngram_column,display_column, n):
    #call the create_ngrams function
    grams  = create_ngrams(dataset, ngram_column,n)
    #explode the ngrams into new rows
    data = grams.withColumn('ngrams',F.explode(grams.ngrams))
    #count each ngram and group by the column specified in the function and return the resulting dataset
    data = data.select('ngrams', display_column).groupby('ngrams',display_column).count()
    return data
#bigrams.select(['ngrams', 'city']).groupby('ngrams', 'city').count().show()

In [17]:
#use the create_2colucreate_2column_data to create a unigram and group ngrams by city
city = create_2column_data(company_jobs, 'job_description','city', 1)
city.show()

+------------+-------+-----+
|      ngrams|   city|count|
+------------+-------+-----+
|integration,| Austin|    1|
|       siri,| Austin|    1|
|   excellent| Austin|    1|
|    relevant| Austin|    6|
|           –| Austin|    1|
|         key|Boulder|    1|
|         her|Boulder|    1|
|     tables,| Boston|    2|
|      status| Boston|    2|
|   establish|Chicago|    1|
|       teams|Chicago|   12|
|         (or|Chicago|    3|
| skillsshare|Chicago|    1|
|      401(k)|Chicago|    1|
|      master|Chicago|    1|
|   required:|Chicago|    2|
|     systems|Chicago|    2|
|     cutting|Chicago|    2|
|  strategies|Chicago|    3|
|        true|Chicago|    1|
+------------+-------+-----+
only showing top 20 rows



In [18]:
#use the create_2colucreate_2column_data to create a unigram and group ngrams by industry
industry1 = create_2column_data(company_jobs, 'job_description','industry', 1)
industry1.show()

+--------------------+--------------------+-----+
|              ngrams|            industry|count|
+--------------------+--------------------+-----+
|           barkthins|Consumer Packaged...|    1|
|             creates|Consumer Packaged...|    1|
|        formulation,|Consumer Packaged...|    1|
|            payments|   Health Care Plans|    2|
|           retention|   Health Care Plans|    1|
|       interpersonal|   Health Care Plans|    1|
|           business.|Retail - Apparel ...|    1|
|          employment|Retail - Apparel ...|    3|
|        development,|   Computer Hardware|    1|
|                  at|   Computer Hardware|    1|
|        agriculture,|   Computer Hardware|    1|
|            required|Medical Diagnosti...|    2|
|                some|Engineering & Con...|    4|
|              signed|Engineering & Con...|    6|
|implementationstrong|           Insurance|    1|
|                 -in|           Insurance|    1|
|              highly|     Credit Services|    3|


In [19]:
#use the create_2colucreate_2column_data to create a bigram and group by city
city2 = create_2column_data(company_jobs, 'job_description','city', 2)
city2.show()

+--------------------+-------+-----+
|              ngrams|   city|count|
+--------------------+-------+-----+
|  related challenges| Austin|    1|
|     college degree,| Austin|    1|
|   opportunities for| Austin|    1|
|         skills used| Austin|    1|
|      color, gender,| Austin|    1|
|        life. today,| Austin|    1|
|          study data| Austin|    1|
| economics functions| Austin|    1|
|        and content.| Austin|    1|
|    proven technical| Austin|    1|
|      versatile work| Austin|    1|
|         sex, sexual| Austin|    1|
|            taking a|Boulder|    1|
|professional comm...|Boulder|    1|
|        when needed.|Boulder|    1|
|      and logistics.|Boulder|    1|
|     and affirmative|Boulder|    1|
|   protected factor.|Boulder|    1|
|         services to| Boston|    1|
|      development of| Boston|    5|
+--------------------+-------+-----+
only showing top 20 rows



In [20]:
#use the create_2colucreate_2column_data to create a bigram and group by industry
industry2 = create_2column_data(company_jobs, 'job_description','industry', 2)
industry2.show()

+--------------------+--------------------+-----+
|              ngrams|            industry|count|
+--------------------+--------------------+-----+
|   between disparate| Aerospace & Defense|    1|
|            in clear| Aerospace & Defense|    1|
|analysis methodology| Aerospace & Defense|    1|
|      manner through| Aerospace & Defense|    1|
|          and around|Consumer Packaged...|    1|
|     master’s degree|     Medical Devices|    2|
|    understanding of|   Health Care Plans|    1|
|         will ensure|   Health Care Plans|    1|
|            within a|   Health Care Plans|    2|
|    complex analyses|   Health Care Plans|    1|
|machines, supervi...|   Health Care Plans|    1|
|theoretical knowl...|   Health Care Plans|    1|
|           this role|Retail - Apparel ...|    3|
|deployment infras...|Retail - Apparel ...|    1|
|       aggregate and|Retail - Apparel ...|    1|
|   computing systems|Retail - Apparel ...|    1|
|wsdl/soap interfa...|Retail - Apparel ...|    1|
