# Step 3
Here we will load the JSON file containing Krishnamurti's speeches, and work with it as a PySpark dataframe.

Related resources:

*   [PySpark in Colab](https://medium.com/linkit-intecs/pyspark-with-google-colab-d964fd693ca7)

*   [PySpark Documentation](https://spark.apache.org/docs/3.1.3/api/python/getting_started/quickstart.html)

* [Spark SQL Basics](https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53)



**Installing Dependencies**

In [101]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [102]:
%%capture
# Install pyspark module (with additional SQL functionality)
!pip install pyspark[sql]

In [103]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [104]:
import pandas as pd

# Formatting pandas DataFrame display
from google.colab import data_table
data_table.enable_dataframe_formatter()

**Create PySpark DataFrame**  
We will first have to upload `krishnamurti.json`, before running this code block

In [105]:
# Reading in JSON
pandas_df = pd.read_json("krishnamurti_clean.json")

In [106]:
# Creating Spark DataFrame
df = spark.createDataFrame(pandas_df)

# Configuring dataframe (for notebooks only)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 10)

In [107]:
df.printSchema()

root
 |-- Text source: string (nullable = true)
 |-- Talk Type: string (nullable = true)
 |-- Participants Category: string (nullable = true)
 |-- Decade: string (nullable = true)
 |-- Date Code: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- text: string (nullable = true)
 |-- Participants: string (nullable = true)



In [108]:
# Removing duplicates from dataframe
df = df.drop_duplicates(['text'])

In [109]:
df

Text source,Talk Type,Participants Category,Decade,Date Code,Country,City,text,Participants
VB,Discussion,,40s,471230,India,Madras,.. To love one ...,
AV,Discussion,,80s,850609-1,England,Brockwood Park,...Self-interest...,
AV,Discussion,Small Group,70s,760520-2,England,Brockwood Park,After this morni...,"Bohm, David / Sha..."
EP,Discussion,Small Group,70s,760520-2,England,Brockwood Park,After this morni...,"Bohm, David / Sha..."
EP,Discussion,K School,70s,711010,England,Brockwood Park,Am I always self...,
AV,Discussion,Public,70s,750803,Switzerland,Saanen,As this is the l...,
AV,Discussion,Public,70s,720808,Switzerland,Saanen,As this is the l...,
AV,Discussion,,80s,831216,India,Rishi Valley,As this is the l...,
AV,Discussion,Small Group,70s,760520-1,England,Brockwood Park,As you are such ...,"Bohm, David / Sha..."
EP,Discussion,Small Group,80s,801204-1,India,Rishi Valley,Asit and I have ...,"Dalal, Rajesh / H..."


In [110]:
df.select("Participants Category", "Country", "Decade").describe().show()

+-------+---------------------+---------+------+
|summary|Participants Category|  Country|Decade|
+-------+---------------------+---------+------+
|  count|                  285|      367|   367|
|   mean|                 null|     null|  null|
| stddev|                 null|     null|  null|
|    min|    Buddhist Scholars|Australia|   40s|
|    max|             Students|      USA|   80s|
+-------+---------------------+---------+------+



In [114]:
# Creating a groupby object
df.groupby('City').count().show()

+--------------------+-----+
|                City|count|
+--------------------+-----+
|    Ojai, California|   24|
|              Saanen|   90|
|              London|    6|
|              Sydney|    2|
|               Paris|    1|
|             Rajghat|   12|
|     Bombay (Mumbai)|    2|
|           Amsterdam|    1|
|    Madras (Chennai)|   11|
|Sarobia, Pennsylv...|    1|
|      Brockwood Park|   70|
|              Bombay|   35|
|           New Delhi|   22|
|              Madras|   68|
|        Rishi Valley|   22|
+--------------------+-----+



In [115]:
# Running SQL queries
df.createOrReplaceTempView("krishnaTable")
spark.sql("SELECT count(*) as number_of_speeches from krishnaTable").show()

+------------------+
|number_of_speeches|
+------------------+
|               367|
+------------------+



In [None]:
# Can register and invoke user-defined functions
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import expr

@pandas_udf("string")
def fix_decades(s: pd.Series) -> pd.Series:
    return s

spark.udf.register("fix_decades", fix_decades)

df.selectExpr('fix_decades(Decade)').show()

In [120]:
# Finding the number of texts beginning with digitis
df[df.text.rlike(r"life")].describe().show()

+-------+-----------+----------+---------------------+------+-----------------+---------+------+--------------------+--------------------+
|summary|Text source| Talk Type|Participants Category|Decade|        Date Code|  Country|  City|                text|        Participants|
+-------+-----------+----------+---------------------+------+-----------------+---------+------+--------------------+--------------------+
|  count|        328|       328|                  258|   328|              328|      328|   328|                 328|                  93|
|   mean|       null|      null|                 null|  null| 701700.219858156|     null|  null|                null|                null|
| stddev|       null|      null|                 null|  null|99932.28662015585|     null|  null|                null|                null|
|    min|         AV|Discussion|    Buddhist Scholars|   40s|           400909|Australia|Bombay| ..  To love one ...|Agnew [Smith], Wendy|
|    max|         VB|      

In [None]:
# Looking at attributes and methods of this df column
dir(df.text)

In [121]:
# Changing names and dropping unimportant columns
df = df.withColumnRenamed('text', 'Speech')
df.drop("Text source", "Talk Type", "Participants Category", "Date Code", "Participants")

Decade,Country,City,Speech
40s,India,Madras,.. To love one ...
80s,England,Brockwood Park,...Self-interest...
70s,England,Brockwood Park,After this morni...
70s,England,Brockwood Park,After this morni...
70s,England,Brockwood Park,Am I always self...
70s,Switzerland,Saanen,As this is the l...
70s,Switzerland,Saanen,As this is the l...
80s,India,Rishi Valley,As this is the l...
70s,England,Brockwood Park,As you are such ...
80s,India,Rishi Valley,Asit and I have ...


In [123]:
# Saving clean version
df.select("Decade", "Country", "City", "Speech") \
.write \
.save("spark_krishanmurti.json",format="json") # This should be saved in a file named"part-0000...json"

In [124]:
# Ending spark session
spark.stop()