## Install Pyspark, pandas

In [1]:
!pip install pyspark
!pip install findspark
!pip install pyarrow==0.14.1
!pip install pandas
!pip install numpy==1.19.5

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m32.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=e18e75af5297db23fcc6332327350a769640106696d390b5b9dd2f69ad8f7bdd
  Stored in directory: /home/jupyterlab/.cache/pip/wheels/b7/8e/8f/ba5d017af5f502964eb1358e1d496a8519de1645936b01810e
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.1
C

In [3]:
import findspark
findspark.init()

## Import SparkSession, SparkContext

In [6]:
from pyspark.sql import SparkSession 
from pyspark import SparkContext, SparkConf
import pandas as pd

## Intialize SparkContext, SparkSession

In [7]:
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

23/08/16 07:53:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [8]:
spark

## Read Csv file using pandas

In [9]:

mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

In [12]:
mtcars.head()
mtcars.rename(columns={'Unnamed: 0':'name'}, inplace=True )

## Create spark Dataframe from pandas dataframe

In [13]:
df=spark.createDataFrame(mtcars)
df.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|               name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

In [15]:
df.createTempView("cars")

In [18]:
spark.sql("select * from cars where mpg>20 AND cyl < 6").show(5)

+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
|       name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|  Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|   Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|   Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



## Create a Pandas UDF to apply a columnar operation

In [19]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [20]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

<function __main__.convert_wt(s: pandas.core.series.Series) -> pandas.core.series.Series>

## Apply UDF convert_weight

In [23]:

spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
|             name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|weight_imperial|weight_metric|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|           2.62|        1.179|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|          2.875|      1.29375|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|           2.32|        1.044|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|          3.215|      1.44675|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|           3.44|        1.548|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
only showing top 5 rows



In [24]:
spark.sql("select * from cars where name like 'Merc%'").show(5)

+----------+----+---+-----+---+----+----+----+---+---+----+----+
|      name| mpg|cyl| disp| hp|drat|  wt|qsec| vs| am|gear|carb|
+----------+----+---+-----+---+----+----+----+---+---+----+----+
| Merc 240D|24.4|  4|146.7| 62|3.69|3.19|20.0|  1|  0|   4|   2|
|  Merc 230|22.8|  4|140.8| 95|3.92|3.15|22.9|  1|  0|   4|   2|
|  Merc 280|19.2|  6|167.6|123|3.92|3.44|18.3|  1|  0|   4|   4|
| Merc 280C|17.8|  6|167.6|123|3.92|3.44|18.9|  1|  0|   4|   4|
|Merc 450SE|16.4|  8|275.8|180|3.07|4.07|17.4|  0|  0|   3|   3|
+----------+----+---+-----+---+----+----+----+---+---+----+----+
only showing top 5 rows



In [25]:
@pandas_udf("float")
def convert_mpg(s: pd.Series) -> pd.Series:
    return s*0.425;

spark.udf.register("Convert_mpg" , convert_mpg)

spark.sql("select *, Convert_mpg(mpg) as kmpl from cars").show()



+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
|               name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|   kmpl|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|  8.925|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|  8.925|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|   9.69|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|  9.095|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2| 7.9475|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1| 7.6925|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4| 6.0775|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|  10.37|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2| 