In [1]:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window

In [2]:
# create spark session
spark = SparkSession.builder.getOrCreate()

In [3]:
# create a new dataframe from csv file
# when inferSchema is True spark scans the file once to detect the schema
companies = spark.read.csv(
            "companies_sorted.csv", 
            header=True, inferSchema=True)

In [4]:
# examine record count
companies.count()

7173438

In [5]:
# review schema detected, note that all columns are detected as a string
companies.dtypes

[('_c0', 'string'),
 ('name', 'string'),
 ('domain', 'string'),
 ('year founded', 'string'),
 ('industry', 'string'),
 ('size range', 'string'),
 ('locality', 'string'),
 ('country', 'string'),
 ('linkedin url', 'string'),
 ('current employee estimate', 'string'),
 ('total employee estimate', 'string')]

In [6]:
# review the first 10 records
companies.take(10)

[Row(_c0='5872184', name='ibm', domain='ibm.com', year founded='1911.0', industry='information technology and services', size range='10001+', locality='new york, new york, united states', country='united states', linkedin url='linkedin.com/company/ibm', current employee estimate='274047', total employee estimate='716906'),
 Row(_c0='4425416', name='tata consultancy services', domain='tcs.com', year founded='1968.0', industry='information technology and services', size range='10001+', locality='bombay, maharashtra, india', country='india', linkedin url='linkedin.com/company/tata-consultancy-services', current employee estimate='190771', total employee estimate='341369'),
 Row(_c0='21074', name='accenture', domain='accenture.com', year founded='1989.0', industry='information technology and services', size range='10001+', locality='dublin, dublin, ireland', country='ireland', linkedin url='linkedin.com/company/accenture', current employee estimate='190689', total employee estimate='455768

In [8]:
# lets take a closer look at year founded, we expect this to be numeric
companies.describe(['year founded']).show()

+-------+------------------+
|summary|      year founded|
+-------+------------------+
|  count|           3566539|
|   mean|2001.7467488403838|
| stddev|20.966201464333643|
|    min|  ""allhotels""​)"|
|    max|           россия"|
+-------+------------------+



In [9]:
#from the summary, it appears that the file was not parsed correctly. Let's take a closer look at one of the records
companies.filter(companies["year founded"].contains('allhotels')).take(1)

[Row(_c0='4133811', name='"lore group (trade mark: ""lore travel""\u200b', domain=' ""visit armenia""\u200b', year founded=' ""allhotels""\u200b)"', industry='loretravel.com', size range='2011.0', locality='leisure, travel & tourism', country='1 - 10', linkedin url=None, current employee estimate=None, total employee estimate='linkedin.com/company/lore-group-trade-mark-lore-travel-visit-armenia-allhotels-')]

In [9]:
# it appears the escape character is the double-quote character, let's reload the file
companies = spark.read.csv(
        "companies_sorted.csv", 
         header=True, inferSchema=True, escape='"')

In [10]:
# lets look at 'year founded' again
companies.describe(['year founded']).show()

+-------+------------------+
|summary|      year founded|
+-------+------------------+
|  count|           3566452|
|   mean|2001.7466646029852|
| stddev| 20.96628369220692|
|    min|            1451.0|
|    max|    sporting goods|
+-------+------------------+



In [11]:
# from the summary, the file is still not parsed correctly. Lets take a closer look at one of the records
companies.filter(companies["year founded"].contains('sporting goods')).take(1)

[Row(_c0='www.edsports.co.uk"', name=None, domain=None, year founded='sporting goods', industry='1 - 10', size range='riverhead, new york, united states', locality='united states', country='linkedin.com/company/edwards-sports-ctr', linkedin url='0', current employee estimate=1, total employee estimate=None)]

In [12]:
# the first column of that row parsed incorrectly, It starts with a domain name which suggests a multi-line issue
companies = spark.read.csv(
        "companies_sorted.csv",
        header=True, inferSchema=True, escape='"', multiLine=True)

In [14]:
# lets look at 'year founded' again
companies.describe(['year founded']).show()

+-------+------------------+
|summary|      year founded|
+-------+------------------+
|  count|           3566446|
|   mean|2001.7466685322026|
| stddev|20.966273484947724|
|    min|            1451.0|
|    max|            2103.0|
+-------+------------------+



In [13]:
# let review schema detected again
companies.dtypes

[('_c0', 'int'),
 ('name', 'string'),
 ('domain', 'string'),
 ('year founded', 'double'),
 ('industry', 'string'),
 ('size range', 'string'),
 ('locality', 'string'),
 ('country', 'string'),
 ('linkedin url', 'string'),
 ('current employee estimate', 'int'),
 ('total employee estimate', 'int')]

In [14]:
# review record count again
companies.count()

7173426

In [15]:
# review column names
companies.columns

['_c0',
 'name',
 'domain',
 'year founded',
 'industry',
 'size range',
 'locality',
 'country',
 'linkedin url',
 'current employee estimate',
 'total employee estimate']

In [16]:
#rename column
companies = companies.withColumnRenamed('_c0','rowNum')

In [17]:
# review column names
companies.columns

['rowNum',
 'name',
 'domain',
 'year founded',
 'industry',
 'size range',
 'locality',
 'country',
 'linkedin url',
 'current employee estimate',
 'total employee estimate']

In [18]:
# filter companies with no domain name
companies = companies.filter(companies.domain.isNotNull())

In [19]:
#lets review a unique list of countries
companies.select('country').distinct().show(50)

+--------------------+
|             country|
+--------------------+
|            guernsey|
|               aruba|
|             finland|
|           australia|
|              greece|
|           greenland|
|            portugal|
|              israel|
|             ukraine|
|             nigeria|
|netherlands antilles|
|              angola|
|             eritrea|
|         timor-leste|
|              zambia|
|        cook islands|
|          seychelles|
|       liechtenstein|
|             bermuda|
|               macau|
|          uzbekistan|
|              kosovo|
|           guatemala|
|                iraq|
|              poland|
|         south korea|
|             austria|
|                fiji|
|           mauritius|
|      american samoa|
|          kazakhstan|
|           nicaragua|
|              kuwait|
|        turkmenistan|
|            anguilla|
|         saint lucia|
|               niger|
|           gibraltar|
|             albania|
|                mali|
|          

In [20]:
# lets review a unique list of countries and record count
companies.groupBy('country').count().sort('count', ascending=False).show()

+--------------+-------+
|       country|  count|
+--------------+-------+
| united states|1767278|
|          null|1566173|
|united kingdom| 379198|
|        canada| 166860|
|         india| 130023|
|   netherlands| 128183|
|         spain| 116353|
|       germany| 110269|
|     australia| 107341|
|        brazil|  99109|
|        france|  96682|
|         italy|  93344|
|       belgium|  36173|
|        turkey|  33353|
|        sweden|  32537|
|         china|  31572|
|        mexico|  31154|
|   switzerland|  30255|
|       denmark|  27366|
|  south africa|  24708|
+--------------+-------+
only showing top 20 rows



In [21]:
# lets review a unique list of locality and record count, 
# Observe that show action truncates the locality string.
companies.groupBy('locality').count().sort('count', ascending=False).show(10)

+--------------------+-------+
|            locality|  count|
+--------------------+-------+
|                null|1693997|
|london, greater l...|  69075|
|new york, new yor...|  55494|
|madrid, madrid, s...|  35381|
|paris, île-de-fra...|  29995|
|chicago, illinois...|  26203|
|los angeles, cali...|  25786|
|toronto, ontario,...|  25706|
|san francisco, ca...|  24830|
|sao paulo, sao pa...|  23631|
+--------------------+-------+
only showing top 10 rows



In [22]:
# use an additional parameter for show command
# See documentation for parameters: https://spark.apache.org/docs/3.1.2/api/python/_modules/pyspark/sql/dataframe.html#DataFrame.show
companies.groupBy('locality').count().sort('count', ascending=False).show(truncate=False)

+----------------------------------------+-------+
|locality                                |count  |
+----------------------------------------+-------+
|null                                    |1693997|
|london, greater london, united kingdom  |69075  |
|new york, new york, united states       |55494  |
|madrid, madrid, spain                   |35381  |
|paris, île-de-france, france            |29995  |
|chicago, illinois, united states        |26203  |
|los angeles, california, united states  |25786  |
|toronto, ontario, canada                |25706  |
|san francisco, california, united states|24830  |
|sao paulo, sao paulo, brazil            |23631  |
|houston, texas, united states           |23630  |
|london, london, united kingdom          |22855  |
|bombay, maharashtra, india              |18061  |
|atlanta, georgia, united states         |17052  |
|barcelona, catalonia, spain             |17046  |
|austin, texas, united states            |16640  |
|san diego, california, united 

In [23]:
# lets verify that each locality has exactly 2 commas (when locality is available)
# This is an example of calling methods on RDD directly, map function not available in DataFrame. 
# Usually method calls on DataFrames are more efficient because it can be optimized by the spark platform
companies.rdd.map(
    lambda company : (
        company['locality'].count(",") if company['locality'] is not None else 0,
        )).toDF(['comma_count']).groupBy('comma_count').count().show(10)

+-----------+-------+
|comma_count|  count|
+-----------+-------+
|          0|1693997|
|          2|3828808|
+-----------+-------+



In [24]:
# parse city and state from locality
companies = companies\
                .withColumn('city', F.split(companies['locality'], ',')[0])\
                .withColumn('state', F.split(companies['locality'], ',')[1])

In [25]:
#identify most popular company names
companies.groupBy('name').count()\
        .orderBy("count", ascending=False)\
        .show(50, truncate=False)

+-------------------------+-----+
|name                     |count|
+-------------------------+-----+
|independent consultant   |40   |
|private practice         |40   |
|consultant               |37   |
|independent              |33   |
|indépendant              |27   |
|independent contractor   |26   |
|private                  |25   |
|confidential             |22   |
|autónomo                 |22   |
|profesional independiente|20   |
|shift                    |19   |
|{displayname}            |18   |
|entrepreneur             |18   |
|selbstständig            |18   |
|independiente            |18   |
|elevate                  |17   |
|autônomo                 |17   |
|indigo                   |17   |
|law firm                 |17   |
|cmi                      |16   |
|various                  |16   |
|aspire                   |15   |
|libero professionista    |15   |
|studio legale            |14   |
|autonomo                 |14   |
|author                   |14   |
|zelfstandige 

In [27]:
# When more than 1 companies share a name, we will like to keep the company with the most employees. 
# To do this use the window function within the name partition; 
# name count for rows with the same name
# row number for rows with the same name in the order of the employee estimate
companies = companies.withColumn("name_count", F.count("*").over(
                        Window.partitionBy("name")))\
                .withColumn("row_num", F.row_number().over(
                        Window.partitionBy("name").orderBy(F.desc("total employee estimate"))))

In [28]:
# preview the result of the window function, observe row_num. we are going to drop any row where row_num != 1
companies.select('name','country','industry',
                'total employee estimate', 'name_count', 'row_num')\
        .filter('name_count>1')\
        .show(truncate=False)

+--------------------+--------------+-----------------------------------+-----------------------+----------+-------+
|name                |country       |industry                           |total employee estimate|name_count|row_num|
+--------------------+--------------+-----------------------------------+-----------------------+----------+-------+
|11th hour marketing |united states |marketing and advertising          |3                      |2         |1      |
|11th hour marketing |united states |marketing and advertising          |1                      |2         |2      |
|3dguy               |south africa  |design                             |4                      |2         |1      |
|3dguy               |united states |media production                   |3                      |2         |2      |
|4hd                 |null          |information technology and services|1                      |2         |1      |
|4hd                 |united states |management consulting      

In [29]:
# Filter out row_num>1, this are the companies with the duplicate names to be eliminated
companies = companies.filter('row_num=1')

In [30]:
# are there still duplicate names?
companies.groupBy('name').count().orderBy("count", ascending=False).show(50, truncate=False)

+-----------------------------------------------+-----+
|name                                           |count|
+-----------------------------------------------+-----+
|"40-02" search advertisement agency            |1    |
|"dental care with a difference"®, pc           |1    |
|"increase business sales" travel incentives    |1    |
|"kazakhmys drilling" тоо                       |1    |
|#socentcph - social entrepreneurship copenhagen|1    |
|&proud                                         |1    |
|'freshxpressions'                              |1    |
|(a2d) addicted 2 decor atx, llc                |1    |
|(ipc) industrial power cooling ltd.            |1    |
|+opinião                                       |1    |
|- dnb -                                        |1    |
|...theendofirony.net                           |1    |
|.::: shoppe :::.                               |1    |
|.guōshí                                        |1    |
|.me ideias - house gráfica                     

In [31]:
# drop window columns
companies = companies.drop("row_num").drop("name_count")

In [32]:
# create  dataframe with the top cities i.e. cities with over a 1000 companies
top_cities = companies.groupBy('city','state')\
        .count().sort('count', ascending=False)\
        .filter('count>1000')

In [33]:
# review the top cities
top_cities.show()

+-------------+---------------+-------+
|         city|          state|  count|
+-------------+---------------+-------+
|         null|           null|1673030|
|       london| greater london|  67942|
|     new york|       new york|  54609|
|       madrid|         madrid|  35015|
|        paris|  île-de-france|  29490|
|      chicago|       illinois|  25847|
|  los angeles|     california|  25344|
|      toronto|        ontario|  25274|
|san francisco|     california|  24237|
|    sao paulo|      sao paulo|  23393|
|      houston|          texas|  23313|
|       london|         london|  22419|
|       bombay|    maharashtra|  17907|
|    barcelona|      catalonia|  16875|
|      atlanta|        georgia|  16775|
|       austin|          texas|  16396|
|    san diego|     california|  15593|
|       dallas|          texas|  15270|
|        dubai|          dubai|  14846|
|     montréal|         quebec|  13807|
+-------------+---------------+-------+
only showing top 20 rows



In [34]:
# Using the companies dataset only return rows where city and state exists in the top_cities dataframe
companies_in_top_cities = companies.join(top_cities, 
                    (companies.city==top_cities.city) & 
                    (companies.state==top_cities.state),
                    'inner').select(companies['*'])

In [35]:
#review the top cities in this new dataset in ascending order of company count
companies_in_top_cities.groupBy('city','state')\
        .count().sort('count', ascending=True)\
            .show()

+--------------+------------------+-----+
|          city|             state|count|
+--------------+------------------+-----+
|     stavanger|          rogaland| 1001|
|  gaithersburg|          maryland| 1005|
|        moscow|            moscow| 1005|
|    north york|           ontario| 1012|
|     frederick|          maryland| 1013|
|       barueri|         sao paulo| 1016|
|       lansing|          michigan| 1026|
|     brentwood|         tennessee| 1028|
|       sharjah|           sharjah| 1030|
| staten island|          new york| 1030|
|        duluth|           georgia| 1031|
|       lubbock|             texas| 1033|
|       denmark| western australia| 1034|
|west hollywood|        california| 1036|
|  white plains|          new york| 1038|
|      la jolla|        california| 1041|
|        carmel|           indiana| 1042|
|       ipswich|           suffolk| 1042|
|       vaughan|           ontario| 1046|
|          baku|              baki| 1050|
+--------------+------------------