In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
#Setting up PySpark in Colab
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 28 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=eafd146a19e4ccf21fe32e668b913071a36117a66ba34941997100bc11e0bdcb
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
#Initialize PySpark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

In [None]:
#Loading data into PySpark DataFrame
df = spark.read.csv('/content/gdrive/MyDrive/crunchbase_odm_orgs.csv', header=True)


In [None]:
#Data Exploration with PySpark DF
df.show(10)

+--------------------+--------------------+------------+------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------+----------+------------+--------------------+
|                uuid|                name|        type|primary_role|              cb_url|       domain|        homepage_url|            logo_url|        facebook_url|         twitter_url|        linkedin_url|combined_stock_symbols|          city|    region|country_code|   short_description|
+--------------------+--------------------+------------+------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------+----------+------------+--------------------+
|e1393508-30ea-8a3...|            Wetpaint|organization|     company|https://www.crunc...| wetpaint.com|http://www.wetpai

In [None]:
df.count()

1127735

In [None]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [None]:
#sc = SparkContext()
sc = SparkContext.getOrCreate()
config = sc.getConf()
config.set('spark.cores.max','4')
config.set('spark.executor.memory', '8G')
config.set('spark.driver.maxResultSize', '8g')
config.set('spark.kryoserializer.buffer.max', '512m')
config.set("spark.driver.cores", "4")

sc.stop()

In [None]:
sc = SparkContext(conf = config)
sqlContext = SQLContext(sc)
print("Using Apache Spark Version", sc.version)

Using Apache Spark Version 3.2.1




In [None]:
cb_file = "/content/gdrive/MyDrive/crunchbase_odm_orgs.csv"

In [None]:
cb_sdf = sqlContext.read.option("header", "true").option("delimiter", ",").option("inferSchema", "true").csv(cb_file)
cb_sdf.count()

1127735

In [None]:
cb_sdf = sqlContext.read.format("csv") \
                        .options(header='true', inferschema='true', treatEmptyValuesAsNulls='true') \
                        .load(cb_file)
cb_sdf.count()

1127735

In [None]:
#1. Find all entities with the name that starts with a letter "F" (e.g. Facebook, etc.):
#print the count show() the resulting Spark DataFrame
cb_sdf_F = cb_sdf.filter(cb_sdf.name.startswith('F'))

print(cb_sdf_F.show())
print("There's", cb_sdf_F.count(), "with the name that starts with a letter 'F'.")

+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+-------------+------------+--------------------+
|                uuid|                name|        type|primary_role|              cb_url|              domain|        homepage_url|            logo_url|        facebook_url|         twitter_url|        linkedin_url|combined_stock_symbols|         city|       region|country_code|   short_description|
+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+-------------+------------+--------------------+
|df662812-7f97-0b4...|            Facebook|organization|     company|https://www.crunc...|    

In [None]:
#2. Find all entities located in New York City:
#print the count and show() the resulting Spark DataFrame
cb_sdf_NY = cb_sdf.filter(cb_sdf['city'] == 'New York')

print(cb_sdf_NY.show())
print("There's", cb_sdf_NY.count(), "entities located in New York City.")

+--------------------+--------------------+------------+------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------+--------+------------+--------------------+
|                uuid|                name|        type|primary_role|              cb_url|             domain|        homepage_url|            logo_url|        facebook_url|         twitter_url|        linkedin_url|combined_stock_symbols|    city|  region|country_code|   short_description|
+--------------------+--------------------+------------+------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------+--------+------------+--------------------+
|e1393508-30ea-8a3...|            Wetpaint|organization|     company|https://www.crunc...|       wetpaint.com|http://www.wetpai

In [None]:
#3. Add a "Blog" column to the DataFrame with the row entries set to 1 if the "domain" field contains "blogspot.com", and 0 otherwise.
  #show() only the records with the "Blog" field marked as 1
from pyspark.sql.functions import when, lit
#cb_sdf.filter(cb_sdf['domain'] == 'blogspot.com').show()

cb_sdf_Blog = cb_sdf.withColumn("Blog", when((cb_sdf.domain.contains('blogspot.com')), lit("1")).otherwise(lit("0")))
cb_sdf_Blog.where(cb_sdf_Blog.Blog=='1').show()

#Source: https://sparkbyexamples.com/pyspark/pyspark-add-new-column-to-dataframe/#:~:text=Add%20New%20Column%20with%20Constant%20Value%20In%20PySpark%2C,add%20a%20NULL%20%2F%20None%20use%20lit%20%28None%29.


+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+------------+------------+--------------------+----+
|                uuid|                name|        type|primary_role|              cb_url|              domain|        homepage_url|            logo_url|        facebook_url|         twitter_url|        linkedin_url|combined_stock_symbols|         city|      region|country_code|   short_description|Blog|
+--------------------+--------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+------------+------------+--------------------+----+
|783b2aa6-7742-69e...|     Sad Urdu Poetry|organization|     company|https://www.c

In [None]:
#4. Find all entities with names that are palindromes (name reads the same way forward and reverse, e.g. madam):
  #print the count and show() the resulting Spark DataFrame
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf

def is_palindrome(entity_name):
    if entity_name is None:
        return None
    else:
        return entity_name == entity_name[::-1]

In [None]:
spark_udf = udf(is_palindrome, BooleanType())
palindrome_df = cb_sdf.withColumn('is_palindrome', spark_udf('name'))
palindrome_df = palindrome_df.where(palindrome_df['is_palindrome'])#.select('name', 'is_palindrome')

print(palindrome_df.show())
print("There's", palindrome_df.count(), "entities with names that are palindromes.")


+--------------------+------+------------+------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+--------------+------------+--------------------+-------------+
|                uuid|  name|        type|primary_role|              cb_url|         domain|        homepage_url|            logo_url|        facebook_url|         twitter_url|        linkedin_url|combined_stock_symbols|         city|        region|country_code|   short_description|is_palindrome|
+--------------------+------+------------+------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+--------------+------------+--------------------+-------------+
|ae1ec5c5-5352-cd5...| KAYAK|organization|     company|https://www.crunc...|      kayak.com|http://www.kay