<a href="https://colab.research.google.com/github/CoolandHot/colab_tricks/blob/main/pyspark_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
ngrok_token = "<your ngrok token>"
expose_port = '4050'

# Install local spark with Hadoop
And set the environment

In [None]:
#@title prerequisites
%%capture
!pip install pyspark

# download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# install Apache Spark with Hadoop
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
# findspark will locate Spark on the system and import it as a regular library.
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"
import findspark
findspark.init()
findspark.find()

## Spark UI

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

In [None]:
# run ngrok in the background
!nohup ./ngrok authtoken {ngrok_token}
!nohup ./ngrok http {expose_port} &
!curl --silent --max-time 10 --connect-timeout 5 --show-error http://127.0.0.1:4040/api/tunnels | sed -nE 's/.*public_url":"https:..([^"]*).*/\1/p'

## pyspark

[PySpark Tutorial](https://www.youtube.com/watch?v=_C8kWso4ne4)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', expose_port)\
        .getOrCreate()

spark

In [None]:
%%capture
# download dataset sample
!wget --continue https://raw.githubusercontent.com/GarvitArya/pyspark-demo/main/sample_books.json -O /tmp/sample_books.json

import pandas as pd
pd.DataFrame([{'name': 'Huan', 'work': 'Postgraduate', 'year': 3, 'salary': 12000, 'age': 33}, 
              {'name': 'John', 'work': 'manager', 'year': 2, 'salary': 8400, 'age': 23}, 
              {'name': 'Ken', 'work': pd.NA, 'year': pd.NA, 'salary': 8000, 'age': 62},
              {'name': pd.NA, 'work': 'Engineer', 'year': 2, 'salary': 5000, 'age': 27},
              {'name': 'Tom', 'work': 'HR', 'year': 4, 'salary': pd.NA, 'age': 21},
              {'name': 'Hellen', 'work': 'HR', 'year': 10, 'salary': 4400, 'age': 43}])\
  .to_csv('random.csv', index=False)

In [None]:
df = spark.read.csv("random.csv",
                    header = True,
                    inferSchema=True) # if without inferSchema, spark will read everything as string
df.printSchema()
df.show()
df.describe().show()

root
 |-- name: string (nullable = true)
 |-- work: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)

+------+------------+----+------+---+
|  name|        work|year|salary|age|
+------+------------+----+------+---+
|  Huan|Postgraduate|   3| 12000| 33|
|  John|     manager|   2|  8400| 23|
|   Ken|        null|null|  8000| 62|
|  null|    Engineer|   2|  5000| 27|
|   Tom|          HR|   4|  null| 21|
|Hellen|          HR|  10|  4400| 43|
+------+------------+----+------+---+

+-------+------+--------+------------------+------------------+------------------+
|summary|  name|    work|              year|            salary|               age|
+-------+------+--------+------------------+------------------+------------------+
|  count|     5|       5|                 5|                 5|                 6|
|   mean|  null|    null|               4.2|            7560.0|34.833333333333336|
| stddev|  nul

In [None]:
# add column
df = df.withColumn('add_one_more_year', df['year']+1) # not inplace
df.show()
# rename column
df.withColumnRenamed('add_one_more_year', 'next_year').show() # not inplace
# drop column
df = df.drop('add_one_more_year')
df.show()

+----+------------+----+------+-----------------+
|name|        work|year|salary|add_one_more_year|
+----+------------+----+------+-----------------+
|Huan|Postgraduate|   3| 12000|                4|
|John|     manager|   2|  8400|                3|
| Ken|        null|null|  8000|             null|
|null|    Engineer|   2|  5000|                3|
| Tom|          HR|   4|  null|                5|
+----+------------+----+------+-----------------+

+----+------------+----+------+---------+
|name|        work|year|salary|next_year|
+----+------------+----+------+---------+
|Huan|Postgraduate|   3| 12000|        4|
|John|     manager|   2|  8400|        3|
| Ken|        null|null|  8000|     null|
|null|    Engineer|   2|  5000|        3|
| Tom|          HR|   4|  null|        5|
+----+------------+----+------+---------+

+----+------------+----+------+
|name|        work|year|salary|
+----+------------+----+------+
|Huan|Postgraduate|   3| 12000|
|John|     manager|   2|  8400|
| Ken|    

Lambda function

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
lambdaFn = F.udf(lambda x: x+3, IntegerType())
df.na.drop()\
    .withColumn('add_3_more_year', lambdaFn(df.salary))\
    .show()

+------+------------+----+------+---+------------+--------------+---------------+
|  name|        work|year|salary|age|year_imputed|salary_imputed|add_3_more_year|
+------+------------+----+------+---+------------+--------------+---------------+
|  Huan|Postgraduate|   3| 12000| 33|           3|         12000|          12003|
|  John|     manager|   2|  8400| 23|           2|          8400|           8403|
|Hellen|          HR|  10|  4400| 43|          10|          4400|           4403|
+------+------------+----+------+---+------------+--------------+---------------+



In [None]:
# drop nan rows
df.na.drop(how='any', 
           thresh=2, # only on nan >= 2
           subset=['work', 'year'] # only check on the subset
           ).show()
# fill nan with
df.na.fill('missing', subset=['work']).show()
df.fillna('missing_with_fillna').show()


# impute
from pyspark.ml.feature import Imputer
imputer = Imputer(
            inputCols=['year', 'salary'],
            outputCols=['{}_imputed'.format(c) for c in ['year', 'salary']]
        ).setStrategy('median')
df = imputer.fit(df).transform(df)
df.show()

+------+------------+----+------+---+------------+--------------+
|  name|        work|year|salary|age|year_imputed|salary_imputed|
+------+------------+----+------+---+------------+--------------+
|  Huan|Postgraduate|   3| 12000| 33|           3|         12000|
|  John|     manager|   2|  8400| 23|           2|          8400|
|  null|    Engineer|   2|  5000| 27|           2|          5000|
|   Tom|          HR|   4|  null| 21|           4|          8000|
|Hellen|          HR|  10|  4400| 43|          10|          4400|
+------+------------+----+------+---+------------+--------------+

+------+------------+----+------+---+------------+--------------+
|  name|        work|year|salary|age|year_imputed|salary_imputed|
+------+------------+----+------+---+------------+--------------+
|  Huan|Postgraduate|   3| 12000| 33|           3|         12000|
|  John|     manager|   2|  8400| 23|           2|          8400|
|   Ken|     missing|null|  8000| 62|           3|          8000|
|  null| 

In [None]:
# number of partitions of the dataframe
df.rdd.getNumPartitions()

1

In [None]:
df.count()

5

In [None]:
df2 = df.drop('year', 'salary')
df2.show()

df2.filter('salary_imputed > 8000 AND year_imputed < 4 AND work IS NOT NULL').select(['name','work']).show()
df2.filter(~(df2['salary_imputed']>8000) & # careful, ~ infront to reverse selection
           (df2['year_imputed']<4)).select(['name','work']).show()

+------+------------+---+------------+--------------+
|  name|        work|age|year_imputed|salary_imputed|
+------+------------+---+------------+--------------+
|  Huan|Postgraduate| 33|           3|         12000|
|  John|     manager| 23|           2|          8400|
|   Ken|        null| 62|           3|          8000|
|  null|    Engineer| 27|           2|          5000|
|   Tom|          HR| 21|           4|          8000|
|Hellen|          HR| 43|          10|          4400|
+------+------------+---+------------+--------------+

+----+------------+
|name|        work|
+----+------------+
|Huan|Postgraduate|
|John|     manager|
+----+------------+

+----+--------+
|name|    work|
+----+--------+
| Ken|    null|
|null|Engineer|
+----+--------+



In [None]:
df2.groupBy('work').count().show()
df2.groupBy('work').sum().show()

+------------+-----+
|        work|count|
+------------+-----+
|          HR|    2|
|     manager|    1|
|Postgraduate|    1|
|    Engineer|    1|
|        null|    1|
+------------+-----+

+------------+--------+-----------------+-------------------+
|        work|sum(age)|sum(year_imputed)|sum(salary_imputed)|
+------------+--------+-----------------+-------------------+
|          HR|      64|               14|              12400|
|     manager|      23|                2|               8400|
|Postgraduate|      33|                3|              12000|
|    Engineer|      27|                2|               5000|
|        null|      62|                3|               8000|
+------------+--------+-----------------+-------------------+



Linear Regression

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import LinearRegression

# convert the categorical values
stringIndx = StringIndexer(inputCol='work', outputCol='work_indexed',
                           handleInvalid = 'skip') # handle NULL
df3 = stringIndx.fit(df2).transform(df2)
df3.show()

# the input independent features for ML model is a list of vectors, 
# thus we need to combine them into a new column
feature_assembler = VectorAssembler(inputCols=['work_indexed', 'age', 'year_imputed'], outputCol='indp_vectors')
indepent_feature_vectors = feature_assembler.transform(df3)
indepent_feature_vectors.show()

+------+------------+---+------------+--------------+------------+
|  name|        work|age|year_imputed|salary_imputed|work_indexed|
+------+------------+---+------------+--------------+------------+
|  Huan|Postgraduate| 33|           3|         12000|         2.0|
|  John|     manager| 23|           2|          8400|         3.0|
|  null|    Engineer| 27|           2|          5000|         1.0|
|   Tom|          HR| 21|           4|          8000|         0.0|
|Hellen|          HR| 43|          10|          4400|         0.0|
+------+------------+---+------------+--------------+------------+

+------+------------+---+------------+--------------+------------+---------------+
|  name|        work|age|year_imputed|salary_imputed|work_indexed|   indp_vectors|
+------+------------+---+------------+--------------+------------+---------------+
|  Huan|Postgraduate| 33|           3|         12000|         2.0| [2.0,33.0,3.0]|
|  John|     manager| 23|           2|          8400|         3.

In [None]:
train_set, test_set = indepent_feature_vectors.randomSplit([0.75, 0.25])
lr = LinearRegression(featuresCol='indp_vectors', labelCol='salary_imputed',
                       maxIter=20, regParam=0.1)
model = lr.fit(train_set)
print('intercept:', model.intercept)
print('coefficients:', model.coefficients)

intercept: -11748.95964034237
coefficients: [1833.2519640527532,305.541994049599,3333.135723427926]


In [None]:
linearRegResult = model.transform(test_set[['indp_vectors', 'name','salary_imputed']])
linearRegResult.show()

+---------------+------+--------------+------------------+
|   indp_vectors|  name|salary_imputed|        prediction|
+---------------+------+--------------+------------------+
|[0.0,43.0,10.0]|Hellen|          4400|34720.703338069645|
| [3.0,23.0,2.0]|  John|          8400| 7444.533561812519|
+---------------+------+--------------+------------------+



AttributeError: ignored

In [None]:
df = spark.read.json("/tmp/sample_books.json")
df.select(["title","price", "year_written"]).show(3)

+----------------+-----+------------+
|           title|price|year_written|
+----------------+-----+------------+
|Northanger Abbey| 18.2|        1814|
|   War and Peace| 12.7|        1865|
|   Anna Karenina| 13.5|        1875|
+----------------+-----+------------+
only showing top 3 rows



In [None]:
df.show()

+--------------------+-----------------+-----+--------------------+------------+
|              author|          edition|price|               title|year_written|
+--------------------+-----------------+-----+--------------------+------------+
|        Austen, Jane|          Penguin| 18.2|    Northanger Abbey|        1814|
|        Tolstoy, Leo|          Penguin| 12.7|       War and Peace|        1865|
|        Tolstoy, Leo|          Penguin| 13.5|       Anna Karenina|        1875|
|     Woolf, Virginia|   Harcourt Brace| 25.0|       Mrs. Dalloway|        1925|
|Cunnningham, Michael|   Harcourt Brace|12.35|           The Hours|        1999|
|         Twain, Mark|          Penguin| 5.76|    Huckleberry Finn|        1865|
|    Dickens, Charles|     Random House| 5.75|         Bleak House|        1870|
|         Twain, Mark|     Random House| 7.75|          Tom Sawyer|        1862|
|     Woolf, Virginia|          Penguin| 29.0| A Room of One's Own|        1922|
|       Rowling, J.K.|   Har

In [None]:
df.select("title", "year_written").filter("title LIKE '%Harry Potter%'").distinct().show(20, False)

+------------+------------+
|title       |year_written|
+------------+------------+
|Harry Potter|2000        |
+------------+------------+



In [None]:
from pyspark.sql.functions import max
maxValue = df.agg(max("price")).collect()[0][0]
print("maxValue: ",maxValue)

df.select("title","price").filter(df.price == maxValue).show(20, False)

maxValue:  29.0
+-------------------+-----+
|title              |price|
+-------------------+-----+
|A Room of One's Own|29.0 |
+-------------------+-----+



In [None]:
# convert to a pandas dataframe, so that it could be written to a single csv, instead of basing on the partitions
df_pd = df.toPandas()
df_pd

Unnamed: 0,author,edition,price,title,year_written
0,"Austen, Jane",Penguin,18.2,Northanger Abbey,1814
1,"Tolstoy, Leo",Penguin,12.7,War and Peace,1865
2,"Tolstoy, Leo",Penguin,13.5,Anna Karenina,1875
3,"Woolf, Virginia",Harcourt Brace,25.0,Mrs. Dalloway,1925
4,"Cunnningham, Michael",Harcourt Brace,12.35,The Hours,1999
5,"Twain, Mark",Penguin,5.76,Huckleberry Finn,1865
6,"Dickens, Charles",Random House,5.75,Bleak House,1870
7,"Twain, Mark",Random House,7.75,Tom Sawyer,1862
8,"Woolf, Virginia",Penguin,29.0,A Room of One's Own,1922
9,"Rowling, J.K.",Harcourt Brace,19.95,Harry Potter,2000
