In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 74kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 46.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=d229742e621e0d3d05fac8202c8fa0dc465b780eccdd14085fbc44ba6e69cb82
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


### Create Resilient Distributed Datasets (RDD)

##### Method 1
using *parallelize()* function

In [7]:
from pyspark.sql import SparkSession

# Creating a Spark Session
spark = SparkSession \
.builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()




In [8]:
# Spark Details
spark

In [9]:
# Creating RDD using parallelize function

df = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'),
(4, 5, 6, 'd e f'),
(7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])


In [10]:
df.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+



In [12]:
myData = spark.sparkContext.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])
myData.collect()

[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

##### Method 2
using *createDateFrame()* function

In [13]:
Employee = spark.createDataFrame([
('1', 'Joe', '70000', '1'),
('2', 'Henry', '80000', '2'),
('3', 'Sam', '60000', '2'),
('4', 'Max', '90000', '1')],
['Id', 'Name', 'Sallary','DepartmentId']
)


In [15]:
Employee.show()

+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|           2|
|  3|  Sam|  60000|           2|
|  4|  Max|  90000|           1|
+---+-----+-------+------------+



##### Method 3
using *read* and *load* functions

The data for this part is used from FORCE 2020 Machine Learning Contest with Wells and Seismic

(https://www.npd.no/en/force/events/machine-learning-contest-with-wells-and-seismic/)

To know more about the data, please visit: https://xeek.ai/challenges/force-well-logs/overview

In [19]:
!pip install gdown



In [20]:
! gdown -O lecture_5_log_data.csv https://drive.google.com/uc?id=1YVq71iUjSjqyqJtkmSRbC9NLMCrx0478

Downloading...
From: https://drive.google.com/uc?id=1YVq71iUjSjqyqJtkmSRbC9NLMCrx0478
To: /content/lecture_5_log_data.csv
280MB [00:05, 54.6MB/s]


In [66]:
df = spark.read.format('csv').options(header='true', inferschema='true').load("lecture_5_log_data.csv",header=True, sep = ';')
df.show(5)
df.printSchema()


+-------+--------+------------+---------+------------+------------+---------+------------+----+------------+------------+------------+------------+----+----+------------+------------+------------+----+------------+----+----+------------+---------+----+----+----+--------------------------------+---------------------------------+
|   WELL|DEPTH_MD|       X_LOC|    Y_LOC|       Z_LOC|       GROUP|FORMATION|        CALI|RSHA|        RMED|        RDEP|        RHOB|          GR| SGR|NPHI|         PEF|         DTC|          SP|  BS|         ROP| DTS|DCAL|        DRHO|MUDWEIGHT|RMIC|ROPA| RXO|FORCE_2020_LITHOFACIES_LITHOLOGY|FORCE_2020_LITHOFACIES_CONFIDENCE|
+-------+--------+------------+---------+------------+------------+---------+------------+----+------------+------------+------------+------------+----+----+------------+------------+------------+----+------------+----+----+------------+---------+----+----+----+--------------------------------+---------------------------------+
|15/9-13| 

##### Creating Spark Dataframe using dict

In [34]:
d = {'A': [0, 1, 0],
'B': [1, 0, 1],
'C': [1, 0, 0]}


In [36]:
import numpy as np
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+



In [67]:
print (df.head(4))

print (df.columns)

print (df.dtypes)

[Row(WELL='15/9-13', DEPTH_MD=494.528, X_LOC=437641.96875, Y_LOC=6470972.5, Z_LOC=-469.5018311, GROUP='NORDLAND GP.', FORMATION=None, CALI=19.480834961, RSHA=None, RMED=1.6114097834, RDEP=1.7986813784, RHOB=1.884185791, GR=80.20085144, SGR=None, NPHI=None, PEF=20.915468216, DTC=161.13117981, SP=24.612379074, BS=None, ROP=34.63640976, DTS=None, DCAL=None, DRHO=-0.574927628, MUDWEIGHT=None, RMIC=None, ROPA=None, RXO=None, FORCE_2020_LITHOFACIES_LITHOLOGY=65000, FORCE_2020_LITHOFACIES_CONFIDENCE=1.0), Row(WELL='15/9-13', DEPTH_MD=494.68, X_LOC=437641.96875, Y_LOC=6470972.5, Z_LOC=-469.6538086, GROUP='NORDLAND GP.', FORMATION=None, CALI=19.468799591, RSHA=None, RMED=1.6180702448, RDEP=1.7956413031, RHOB=1.8897935152, GR=79.262886047, SGR=None, NPHI=None, PEF=19.383012772, DTC=160.60346985, SP=23.895530701, BS=None, ROP=34.63640976, DTS=None, DCAL=None, DRHO=-0.570188403, MUDWEIGHT=None, RMIC=None, ROPA=None, RXO=None, FORCE_2020_LITHOFACIES_LITHOLOGY=65000, FORCE_2020_LITHOFACIES_CONFIDENC

In [79]:
print('Shape of Pandas: ',df.toPandas().shape)

Shape of Pandas:  (1170511, 29)


In [80]:
# Finding null values in a column
from pyspark.sql.functions import col
df.where(col("NPHI").isNull()).toPandas().shape



(405102, 29)

In [88]:
# Renaming Column Names
mapping = {'FORMATION':'formation','DEPTH_MD':'depth'}
new_names = [mapping.get(col,col) for col in df.columns]

df.toDF(*new_names).show(4)


+-------+-------+------------+---------+------------+------------+---------+------------+----+------------+------------+------------+------------+----+----+------------+------------+------------+----+------------+----+----+------------+---------+----+----+----+--------------------------------+---------------------------------+
|   WELL|  depth|       X_LOC|    Y_LOC|       Z_LOC|       GROUP|formation|        CALI|RSHA|        RMED|        RDEP|        RHOB|          GR| SGR|NPHI|         PEF|         DTC|          SP|  BS|         ROP| DTS|DCAL|        DRHO|MUDWEIGHT|RMIC|ROPA| RXO|FORCE_2020_LITHOFACIES_LITHOLOGY|FORCE_2020_LITHOFACIES_CONFIDENCE|
+-------+-------+------------+---------+------------+------------+---------+------------+----+------------+------------+------------+------------+----+----+------------+------------+------------+----+------------+----+----+------------+---------+----+----+----+--------------------------------+---------------------------------+
|15/9-13|494.