## Using PySpark to Read Data

Darius Smith | BrainStation | March 29, 2023

**Troubleshooting: received an error due to the Spark configuration set differently. Running the following piece of code in a new cell below at the very beginning of the notebook first**

In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

**Initializing Spark**

In [2]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1680010224891_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f6d522544d0>

**Checking the available python packages and potentially adding new ones**

In [3]:
sc.list_packages()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

aws-cfn-bootstrap (2.0)
beautifulsoup4 (4.9.1)
boto (2.49.0)
click (7.1.2)
docutils (0.14)
jmespath (0.10.0)
joblib (0.15.1)
lockfile (0.11.0)
lxml (4.5.1)
mysqlclient (1.4.2)
nltk (3.5)
nose (1.3.4)
numpy (1.16.5)
pip (9.0.1)
py-dateutil (2.2)
pystache (0.5.4)
python-daemon (2.2.3)
python37-sagemaker-pyspark (1.3.0)
pytz (2020.1)
PyYAML (5.3.1)
regex (2020.6.8)
setuptools (28.8.0)
simplejson (3.2.0)
six (1.13.0)
soupsieve (1.9.5)
tqdm (4.46.1)
wheel (0.29.0)
windmill (1.6)

You are using pip version 9.0.1, however version 23.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

In [4]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1680010224891_0005,pyspark,idle,Link,Link,✔


**Installing data science & plotting packages**

In [5]:
sc.install_pypi_package("pandas==0.25.1") 
sc.install_pypi_package("matplotlib==3.1.1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Using cached https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas==0.25.1)
  Using cached https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.2

Collecting matplotlib==3.1.1
  Using cached https://files.pythonhosted.org/packages/19/7a/60bd79c5d79559150f8bba866dd7d434f0a170312e4d15e8aefa5faba294/matplotlib-3.1.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 (from matplotlib==3.1.1)
  Using cached https://files.pythonhosted.org/packages/6c/10/a7d0fa5baea8fe7b50f448ab742f26f52b80bfca85ac2be9d35cdd9a3246/pyparsing-3.0.9-py3-none-any.whl
Collecting cycler>=0.10 (from matplo

**Checking that the packages have been imported**

In [6]:
import pandas as pd
import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Reading data into an RDD directly from a datasource**

In [7]:
df = spark.read.csv('s3://brainstation-dsft/eng_1M_1gram.csv', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: string (nullable = true)
 |-- frequency: string (nullable = true)
 |-- pages: string (nullable = true)
 |-- books: string (nullable = true)

In [9]:
df.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['token', 'year', 'frequency', 'pages', 'books']

In [10]:
df.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(token='inGermany', year='1927', frequency='2', pages='2', books='2'), Row(token='inGermany', year='1929', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1930', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1933', frequency='1', pages='1', books='1'), Row(token='inGermany', year='1934', frequency='1', pages='1', books='1')]

In [11]:
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+-----+-----+
|    token|year|frequency|pages|books|
+---------+----+---------+-----+-----+
|inGermany|1927|        2|    2|    2|
|inGermany|1929|        1|    1|    1|
|inGermany|1930|        1|    1|    1|
|inGermany|1933|        1|    1|    1|
|inGermany|1934|        1|    1|    1|
+---------+----+---------+-----+-----+
only showing top 5 rows

In [12]:
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

261823225

**Filtering the data where `token` is "data"**

In [13]:
#Register the dataframe as a view
df.createOrReplaceTempView("eng_gram")

#Execute a SQL query/save as dataframe
token_df = spark.sql("SELECT token, year \
                          FROM eng_gram \
                          WHERE token LIKE '%data%' \
                          GROUP BY token, year")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
token_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----+
|      token|year|
+-----------+----+
|    laudata|1965|
|   laudatae|1838|
|laticaudata|1957|
|     nndata|1928|
|     sedata|1896|
+-----------+----+
only showing top 5 rows

**Counting the number of rows that have 'data' in the `token` column.**

In [15]:
count = spark.sql("SELECT COUNT(*) FROM eng_gram WHERE token LIKE '%data%'").collect()[0][0]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
print(count)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

24642

**There are 24,642 rows that contain 'data.'** Now, I will write the filtered data back to a directory in the HDFS from Spark using df.write.csv(). I will use the header=True parameter and examine the contents of what I've written.

In [21]:
new_token_df = token_df.coalesce(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
new_token_df.write.csv('s3://big-data-aws/eng_1M_1gram_.csv', header = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…