<a href="https://colab.research.google.com/github/Tanuja2725/vm/blob/main/spark_DataFrames.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Last amended: 16th April,2023
# Myfolder: github/hadoop
# Objectives:
#       |      i)  Install pyspark on colab
#             ii) Install koalas on colab
#                 (version installed is spark-3.3.2)
#
# Java 8 install: https://stackoverflow.com/a/58191107
# Hadoop install: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
# Spark install:  https://stackoverflow.com/a/64183749
#                 https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/

# Spark Reference API
a. [Quickstart](https://spark.apache.org/docs/latest/quick-start.html) <br>
b. Dataframe [APIs list](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html) at a glance<br>
c. Pandas [API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html) on spark <br>
c. ALso look at useful [this source code](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html) of functions that has examples<br>



# A. Full spark install
Installs `pyspark (spark-3.3.2-bin-hadoop3)` .<br>
Takes less than a minute to install

### 1.0 Libraries

In [2]:
# 1.0 How to set environment variable
import os  
import time  

## 2.0 Define some functions

#### ssh_install()

In [3]:
# 2.0 Function to install ssh client and sshd (Server)
def ssh_install():
  print("\n--1. Download and install ssh server----\n")
  ! sudo apt-get remove openssh-client openssh-server
  ! sudo apt install openssh-client openssh-server
  
  print("\n--2. Restart ssh server----\n")
  ! service ssh restart

#### Java install

In [4]:
# 3.0 Function to download and install java 8
def install_java():
  ! rm -rf /usr/java

  print("\n--Download and install Java 8----\n")
  !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null        # install openjdk
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     # set environment variable

  !update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
  !update-alternatives --set javac /usr/lib/jvm/java-8-openjdk-amd64/bin/javac
  
  !mkdir -p /usr/java
  ! ln -s "/usr/lib/jvm/java-8-openjdk-amd64"  "/usr/java"
  ! mv "/usr/java/java-8-openjdk-amd64"  "/usr/java/latest"
  
  !java -version       #check java version
  !javac -version

#### setup ssh passphrase

In [5]:
# 6.0 Function tp setup ssh passphrase
def set_keys():
  print("\n---22. Generate SSH keys----\n")
  ! cd ~ ; pwd 
  ! cd ~ ; ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
  ! cd ~ ; cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
  ! cd ~ ; chmod 0600 ~/.ssh/authorized_keys


#### Set environment

In [6]:
# 7.0 Function to set up environmental variables
def set_env():
  print("\n---23. Set Environment variables----\n")
  # 'export' command does not work in colab
  # https://stackoverflow.com/a/57240319
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     #set environment variable
  os.environ["JRE_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/jre"   
  

#### function to install prerequisites
java and ssh<br>


In [7]:
# 8.0 Function to call all functions
def install_components():
  print("\n--Install java----\n")
  ssh_install()
  install_java()  
  #set_keys()
  set_env()


## 3.0 Install components
Start downloading, install and configure. Takes around 2 minutes<br>
Your <u>input *'y'* is required </u>at one place while overwriting earlier ssh keys

In [8]:
# 9.0 Start installation
start = time.time()
install_components()
end = time.time()
print("\n---Time taken----\n")
print((end- start)/60)


--Install java----


--1. Download and install ssh server----

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Package 'openssh-server' is not installed, so not removed
The following packages were automatically installed and are no longer required:
  libboost-atomic-dev libboost-atomic1.71-dev libboost-atomic1.71.0
  libboost-chrono-dev libboost-chrono1.71-dev libboost-chrono1.71.0
  libboost-container-dev libboost-container1.71-dev libboost-container1.71.0
  libboost-context-dev libboost-context1.71-dev libboost-context1.71.0
  libboost-coroutine-dev libboost-coroutine1.71-dev libboost-coroutine1.71.0
  libboost-date-time-dev libboost-date-time1.71-dev libboost-date-time1.71.0
  libboost-exception-dev libboost-exception1.71-dev libboost-fiber-dev
  libboost-fiber1.71-dev libboost-fiber1.71.0 libboost-filesystem-dev
  libboost-filesystem1.71-dev libboost-filesystem1.71.0 libboost-graph-dev
  libboost-graph-parallel-dev libboost-graph-par

## 4.0 Install spark


### Define functions

`findspark`: PySpark isn't on `sys.path` by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding `pyspark` to `sys.path` at runtime. `findspark` does the latter.

In [9]:
# 1.0 Function to download and unzip spark
def spark_koalas_install():
  print("\n--1.1 Install findspark----\n")
  !pip install -q findspark

  #print("\n--1.2 Install databricks Koalas----\n")
  #!pip install koalas
  
  # This download link NEEDS TO BE CHECKED AGAIN
  print("\n--1.3 Download Apache tar.gz----\n")
            
  ! wget -c https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz 

  print("\n--1.4 Transfer downloaded content and unzip tar.gz----\n")
  !  mv /content/spark*   /opt/
  ! tar -xzf /opt/spark-3.3.2-bin-hadoop3.tgz  --directory /opt/

  print("\n--1.5 Check folder for files----\n")
  ! ls -la /opt


In [10]:
# 1.1 Function to set environment
def set_spark_env():
  print("\n---2. Set Environment variables----\n")
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
  os.environ["JRE_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/jre" 
  os.environ["SPARK_HOME"] = "/opt/spark-3.3.2-bin-hadoop3" 
  os.environ["SPARK_CONF_DIR"] = "/opt/spark-3.3.2-bin-hadoop3/conf"     
  os.environ["LD_LIBRARY_PATH"] += ":/opt/spark-3.3.2-bin-hadoop3/lib/native"
  os.environ["PATH"] += ":/opt/spark-3.3.2-bin-hadoop3/bin:/opt/spark-3.3.2-bin-hadoop3/sbin"
  print("\n---2.1. Check Environment variables----\n")
  # Check
  ! echo $PATH
  ! echo $LD_LIBRARY_PATH

In [11]:
# 1.2 Function to configure spark 
def spark_conf():
  print("\n---3. Configure spark to access hadoop----\n")
  !mv /opt/spark-3.3.2-bin-hadoop3/conf/spark-env.sh.template  /opt/spark-3.3.2-bin-hadoop3/conf/spark-env.sh
  #!echo "HADOOP_CONF_DIR=/opt/hadoop-3.2.2/etc/hadoop/" >> /opt/spark-3.1.2-bin-hadoop3.2/conf/spark-env.sh
  print("\n---3.1 Check ----\n")
  #!cat /opt/spark-3.1.1-bin-hadoop3.2/conf/spark-env.sh

### Install spark


In [12]:
# 2.0 Call all the three functions
def install_spark():
  spark_koalas_install()
  set_spark_env()
  spark_conf()


In [13]:
# 2.1 
install_spark()


--1.1 Install findspark----


--1.3 Download Apache tar.gz----

--2023-04-19 14:49:43--  https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299360284 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.2-bin-hadoop3.tgz’


2023-04-19 14:49:45 (186 MB/s) - ‘spark-3.3.2-bin-hadoop3.tgz’ saved [299360284/299360284]


--1.4 Transfer downloaded content and unzip tar.gz----


--1.5 Check folder for files----

total 292368
drwxr-xr-x  1 root root      4096 Apr 19 14:49 .
drwxr-xr-x  1 root root      4096 Apr 19 14:48 ..
drwxr-xr-x  1 root root      4096 Apr 14 13:50 google
drwxr-xr-x  1 root root      4096 Feb  2 05:25 nvidia
drwxr-xr-x 13  501 1000      4096 Feb 10 20:40 spark-3.3.2-bin-hadoop3
-rw-r--r--  1 root root 299360284 Feb 10 21:28 spark-3.3.2-bin-had

# B. Call libraries
We do not import `pandas` but `pyspark.pandas` as `ps`.

In [14]:
# 3.0 Just call some libraries to test
import numpy as np
import os
import time 

# 3.1 Get spark in sys.path
import findspark
findspark.init()

# 3.2 Call other spark libraries
#     Just to test
import pyspark.pandas as ps
from pyspark.sql import SparkSession
#import databricks.koalas as ks



In [15]:
# 3.3
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [16]:
# 3.4 Increase cell width to display wide columnar output
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# C. Build spark session
You can modify spark driver/executor memory here<br>
SparkSession name is `spark`<br>
databricks.koalas is imported as `ks`


## Modifying spark configuraion
Increase driver and executor memory

In [17]:
# 4.0 Check template file
! cat /opt/spark-3.3.2-bin-hadoop3/conf/spark-defaults.conf.template

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enable

In [18]:
# 4.1 Create spark-defaults.conf 
! cp /opt/spark-3.3.2-bin-hadoop3/conf/spark-defaults.conf.template  /opt/spark-3.3.2-bin-hadoop3/conf/spark-defaults.conf


In [19]:
# 4.2 Amend properties
! echo "spark.driver.memory 6g" >> /opt/spark-3.3.2-bin-hadoop3/conf/spark-defaults.conf
! echo "spark.executor.memory 3g" >> /opt/spark-3.3.2-bin-hadoop3/conf/spark-defaults.conf

In [20]:
# 4.3 Check now
! cat /opt/spark-3.3.2-bin-hadoop3/conf/spark-defaults.conf

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enable

## Stop and start SparkSession

In [21]:
# 5.0 Build spark session:
# Stop spark, if started
#spark.stop()
# 5.1 Now start spark
spark = SparkSession. \
                    builder. \
                    master("local[*]"). \
                    appName("myexpt"). \
                    getOrCreate()

In [22]:
sc = spark.sparkContext
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.executor.memory', '3g'),
 ('spark.app.name', 'myexpt'),
 ('spark.app.id', 'local-1681915798217'),
 ('spark.executor.id', 'driver'),
 ('spark.app.submitTime', '1681915795447'),
 ('spark.driver.port', '38377'),
 ('spark.driver

In [23]:
# 5.2
print(spark.sparkContext._conf.getAll())

[('spark.driver.extraJavaOptions', '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'), ('spark.executor.memory', '3g'), ('spark.app.name', 'myexpt'), ('spark.app.id', 'local-1681915798217'), ('spark.executor.id', 'driver'), ('spark.app.submitTime', '1681915795447'), ('spark.driver.port', '38377'), ('spark.driver.memory',

# D. Experimentation with pyspark pandas API
pyspark pandas is imported as `ps`

### pyspark pandas Series

In [None]:
# 1.0
ps.Series(np.random.random(size = (10,)))

0    0.517089
1    0.873551
2    0.847714
3    0.024512
4    0.328187
5    0.142017
6    0.800549
7    0.679741
8    0.187777
9    0.332397
dtype: float64

In [None]:
# 1.0.1
ps.Series([10,20,30])

0    10
1    20
2    30
dtype: int64

In [None]:
# 1.0.2
ps.Series((4,9,1))

0    4
1    9
2    1
dtype: int64

In [None]:
# 1.0.3
ps.Series(np.random.random(size = (5)), index = ['a', 'b', 'c', 'd', 'e'])

a    0.964787
b    0.336336
c    0.542865
d    0.801836
e    0.679892
dtype: float64

In [None]:
# 1.0.4
ps.Series(np.random.random(size = (5)), index = ['a', 'b', 'c', 'd', 'e']).shape

(5,)

In [None]:
# 1.0.5
dict(ps.Series(np.random.random(size = (5)), index = ['a', 'b', 'c', 'd', 'e']))

In [None]:
# 1.0.6
# Series to numpy:
# Note the warnings
ps.Series(np.random.random(size = (5)), index = ['a', 'b', 'c', 'd', 'e']).to_numpy()



array([0.56726725, 0.32933734, 0.01821741, 0.16558889, 0.14302725])

In [None]:
# 1.1
abc = ps.Series(np.random.random(size = (5)), index = ['a', 'b', 'c', 'd', 'e'])


In [None]:
# 1.2 Try to press tab after .(dot)
#       and see what all methods are avaiable:

abc.

In [None]:
# 1.3
abc.add(abc)

a    1.766233
b    0.250937
c    1.370136
d    1.962745
e    1.675979
dtype: float64

### pyspark pandas DataFrame

In [None]:
# 2.1
ps.DataFrame([
               [1,2,3],
               [4,5,6]
              ], columns = list('abc') )

Unnamed: 0,a,b,c
0,1,2,3
1,4,5,6


In [None]:
# 2.1.1
ps.DataFrame(
              [
                [1,2,3],
                [4,5,6]
              ],
              columns = list('abc'),
              index = [4,7]
             )

Unnamed: 0,a,b,c
4,1,2,3
7,4,5,6


In [None]:
# 2.1.2
#     Note how nulls have been inserted.
#     As np.nan and None

pdf = ps.DataFrame(
                   {
                      'x1': ['a','a','b','b',np.nan, 'c', 'd','d'] * 10,
                      'x2': ['apple',np.nan, 'orange','orange', 'peach',np.nan,'apple','orange'] * 10,
                      'x3': [1, 1, 2, 2, 2, 4, 1, 2] *10,
                      'x4': [2.4, None, 3.5, 1.4, 2.1,1.5, 3.0, 2.0] *10,
                      'y1': [1, 0, 1, 0, 0, 1, 1, 0] * 10,
                      'y2': ['yes', 'no', 'no',np.nan,None, 'yes','', 'yes'] *10,
                      'y3': [True, True, False, None, False, True, False,False] *10
                    }
                   )

  fields = [
  for column, series in pdf.iteritems():


In [None]:
# 2.1.3
type(pdf)

print("\n======\n")

# 2.1.4
pdf.head()

In [None]:
# 2.1.5
pdf.columns

Index(['x1', 'x2', 'x3', 'x4', 'y1', 'y2', 'y3'], dtype='object')

In [None]:
# 2.1.6
pdf['x1'].head(6)

print("\n======\n")

# 2.1.7
type(pdf['x1'])    # pyspark.pandas.series.Series

0       a
1       a
2       b
3       b
4    None
5       c
Name: x1, dtype: object





pyspark.pandas.series.Series

In [None]:
# 2.2
pdf[['x1', 'x2']].head()

print("\n======\n")

# 2.3
type(pdf[['x1', 'x2']])   # pyspark.pandas.frame.DataFrame

Unnamed: 0,x1,x2
0,a,apple
1,a,
2,b,orange
3,b,orange
4,,peach






pyspark.pandas.frame.DataFrame

In [None]:
# 2.3.1
pdf.shape   # (80,7)

print("\n======\n")

# 2.3.2
pdf.dtypes

(80, 7)





x1     object
x2     object
x3      int64
x4    float64
y1      int64
y2     object
y3     object
dtype: object

In [None]:
# 2.4 Only numeric features are described:

pdf.describe()

Unnamed: 0,x3,x4,y1
count,80.0,70.0,80.0
mean,1.875,2.271429,0.5
std,0.932874,0.713664,0.503155
min,1.0,1.4,0.0
25%,1.0,1.5,0.0
50%,2.0,2.1,0.0
75%,2.0,3.0,1.0
max,4.0,3.5,1.0


In [None]:
# 2.5 Describing cat features:

pdf[['x1', 'x2']].describe()

Unnamed: 0,x1,x2
count,70,60
unique,4,3
top,d,orange
freq,20,30


In [None]:
# 2.5.1
pdf.info()

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 80 entries, 0 to 79
Data columns (total 7 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   x1      70 non-null     object 
 1   x2      60 non-null     object 
 2   x3      80 non-null     int64  
 3   x4      70 non-null     float64
 4   y1      80 non-null     int64  
 5   y2      60 non-null     object 
 6   y3      70 non-null     object 
dtypes: float64(1), int64(2), object(4)

In [None]:
# 3.0 Slicing:

pdf.iloc[:2, :4]

Unnamed: 0,x1,x2,x3,x4
0,a,apple,1,2.4
1,a,,1,


In [None]:
# 3.1 Slicing:

pdf.iloc[[1,3,7], [2,4]]

Unnamed: 0,x3,y1
1,1,0
3,2,0
7,2,0


In [None]:
# 3.2 Slicing:

pdf.loc[[1,3,7], ['x1', 'x2']]

Unnamed: 0,x1,x2
1,a,
3,b,orange
7,d,orange


### Operations on Series

In [None]:
# 4.0 Addition
https://stackoverflow.com/a/73701464
ps.Series(np.random.random(size = (10,))) + ps.Series(np.random.random(size = (10,)))

In [None]:
# 4.1 Set option: Default is False to avoid excessive computation:

ps.set_option('compute.ops_on_diff_frames', True)

In [None]:
# 4.2
ps.Series(np.random.random(size = (10,))) + ps.Series(np.random.random(size = (10,)))

0    1.036895
1    0.330829
2    0.493790
3    1.170752
4    0.985252
5    1.285973
6    1.422313
7    0.321337
8    0.828303
9    0.749595
dtype: float64

In [None]:
# 4.3 Add two series with different Index
abc = ps.Series(
                 np.random.normal(loc = 1.0, scale = 2.0, size = (5,) ),
                 index = ['a', 'c', 'b', 'e', 'f']
                )

In [None]:
# 4.3 Add two series with different Index
# 4.3 Add two series with different Index
cde = ps.Series(
                 np.random.normal(loc = 1.0, scale = 2.0, size = (5,) ),
                 index = ['a', 'b', 'c', 'e', 'k']
                )

In [None]:
abc
print("\n========\n")
cde
print("\n========\n")
abc + cde

a   -0.855975
c    3.015098
b   -0.675212
e    6.212318
f    1.581511
dtype: float64





a    1.676420
b   -0.172374
c    3.272379
e   -4.057456
k    6.525404
dtype: float64





a    0.820444
b   -0.847586
c    6.287476
e    2.154862
f         NaN
k         NaN
dtype: float64

### DataFrame operations

In [None]:
tr =  ps.DataFrame(  [
                        [4,5,6],
                        [1.2,1.3,1.4]
                     ],
                     columns = list('abc')
                   )

In [None]:
tr

Unnamed: 0,a,b,c
0,4.0,5.0,6.0
1,1.2,1.3,1.4


In [None]:
tr['xyz'] = [8,9]

In [None]:
tr

Unnamed: 0,a,b,c,xyz
0,4.0,5.0,6.0,8
1,1.2,1.3,1.4,9


In [None]:
tr.drop(columns = ['xyz'])
tr.drop(['xyz'], axis = 1)

Unnamed: 0,a,b,c
0,4.0,5.0,6.0
1,1.2,1.3,1.4


Unnamed: 0,a,b,c
0,4.0,5.0,6.0
1,1.2,1.3,1.4


In [None]:
tr.rename(columns = {'xyz' : 'cde'})

Unnamed: 0,a,b,c,cde
0,4.0,5.0,6.0,8
1,1.2,1.3,1.4,9


In [None]:
df = ps.DataFrame([
                    ('bird', 389.0),
                    ('bird', 24.0),
                    ('mammal', 80.5),
                    ('mammal', np.nan)],
                  index=['falcon', 'parrot', 'lion', 'monkey'],
                  columns=('class', 'max_speed')
                  )

df

Unnamed: 0,class,max_speed
falcon,bird,389.0
parrot,bird,24.0
lion,mammal,80.5
monkey,mammal,


In [None]:
df.reset_index()
df.reset_index(drop = True)

Unnamed: 0,index,class,max_speed
0,falcon,bird,389.0
1,parrot,bird,24.0
2,lion,mammal,80.5
3,monkey,mammal,


Unnamed: 0,class,max_speed
0,bird,389.0
1,bird,24.0
2,mammal,80.5
3,mammal,


#### Filteration

In [None]:
ps.date_range(start = '01/01/2001', end = "01/01/2002", periods = 24)

  pd.date_range(


DatetimeIndex([       '2001-01-01 00:00:00', '2001-01-16 20:52:10.434782',
               '2001-02-01 17:44:20.869565', '2001-02-17 14:36:31.304347',
               '2001-03-05 11:28:41.739130', '2001-03-21 08:20:52.173913',
               '2001-04-06 05:13:02.608695', '2001-04-22 02:05:13.043478',
               '2001-05-07 22:57:23.478260', '2001-05-23 19:49:33.913043',
               '2001-06-08 16:41:44.347826', '2001-06-24 13:33:54.782608',
               '2001-07-10 10:26:05.217391', '2001-07-26 07:18:15.652173',
               '2001-08-11 04:10:26.086956', '2001-08-27 01:02:36.521739',
               '2001-09-11 21:54:46.956521', '2001-09-27 18:46:57.391304',
               '2001-10-13 15:39:07.826086', '2001-10-29 12:31:18.260869',
               '2001-11-14 09:23:28.695652', '2001-11-30 06:15:39.130434',
               '2001-12-16 03:07:49.565217',        '2002-01-01 00:00:00'],
              dtype='datetime64[ns]', freq=None)

  pd.date_range(


In [None]:
ux = ps.date_range(start = '01-01-2001', end = '01-01-2002', periods = 30)

  pd.date_range(


In [None]:
ux

DatetimeIndex([       '2001-01-01 00:00:00', '2001-01-13 14:04:08.275862',
               '2001-01-26 04:08:16.551724', '2001-02-07 18:12:24.827586',
               '2001-02-20 08:16:33.103448', '2001-03-04 22:20:41.379310',
               '2001-03-17 12:24:49.655172', '2001-03-30 02:28:57.931034',
               '2001-04-11 16:33:06.206896', '2001-04-24 06:37:14.482758',
               '2001-05-06 20:41:22.758620', '2001-05-19 10:45:31.034482',
               '2001-06-01 00:49:39.310344', '2001-06-13 14:53:47.586206',
               '2001-06-26 04:57:55.862068', '2001-07-08 19:02:04.137931',
               '2001-07-21 09:06:12.413793', '2001-08-02 23:10:20.689655',
               '2001-08-15 13:14:28.965517', '2001-08-28 03:18:37.241379',
               '2001-09-09 17:22:45.517241', '2001-09-22 07:26:53.793103',
               '2001-10-04 21:31:02.068965', '2001-10-17 11:35:10.344827',
               '2001-10-30 01:39:18.620689', '2001-11-11 15:43:26.896551',
               '2001-11-2

In [None]:
gx = ps.DataFrame(np.random.random(size = (30,)))
gx

In [None]:
gx['tr'] = ux

In [None]:
gx.columns

Index([0, 'tr'], dtype='object')

In [None]:
ind.dtype

dtype('<M8[ns]')

In [None]:
ps.DataFrame(np.random.random(size = (30, 5)) )

#E. SparkSQL engine

In [24]:
# Create a pyspark pandas DataFrame
#     Note how nulls have been inserted.
#     As np.nan and None

pdf = ps.DataFrame(
                   {
                      'x1': ['a','a','b','b',np.nan, 'c', 'd','d'] * 10,
                      'x2': ['apple',np.nan, 'orange','orange', 'peach',np.nan,'apple','orange'] * 10,
                      'x3': [1, 1, 2, 2, 2, 4, 1, 2] *10,
                      'x4': [2.4, None, 3.5, 1.4, 2.1,1.5, 3.0, 2.0] *10,
                      'y1': [1, 0, 1, 0, 0, 1, 1, 0] * 10,
                      'y2': ['yes', 'no', 'no',np.nan,None, 'yes','', 'yes'] *10,
                      'y3': [True, True, False, None, False, True, False,False] *10
                    }
                   )

  fields = [
  for column, series in pdf.iteritems():


In [None]:
# Have a look:
pdf.head()

Unnamed: 0,x1,x2,x3,x4,y1,y2,y3
0,a,apple,1,2.4,1,yes,True
1,a,,1,,0,no,True
2,b,orange,2,3.5,1,no,False
3,b,orange,2,1.4,0,,
4,,peach,2,2.1,0,,False


In [None]:
pdf.index

Int64Index([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
            17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
            34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
            51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
            68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79],
           dtype='int64')

In [None]:
# Transform to pyspark SQL Dataframe

pdf_sc = pdf.to_spark()
type(pdf_sc)



pyspark.sql.dataframe.DataFrame

In [None]:
# Show the dataframe
pdf_sc.show(3)
print("\n------\n")
pdf_sc.head(3)
print("\n------\n")
pdf_sc.collect()  # Shows all DataFrame


+---+------+---+----+---+---+-----+
| x1|    x2| x3|  x4| y1| y2|   y3|
+---+------+---+----+---+---+-----+
|  a| apple|  1| 2.4|  1|yes| true|
|  a|  null|  1|null|  0| no| true|
|  b|orange|  2| 3.5|  1| no|false|
+---+------+---+----+---+---+-----+
only showing top 3 rows


------



[Row(x1='a', x2='apple', x3=1, x4=2.4, y1=1, y2='yes', y3=True),
 Row(x1='a', x2=None, x3=1, x4=None, y1=0, y2='no', y3=True),
 Row(x1='b', x2='orange', x3=2, x4=3.5, y1=1, y2='no', y3=False)]


------



[Row(x1='a', x2='apple', x3=1, x4=2.4, y1=1, y2='yes', y3=True),
 Row(x1='a', x2=None, x3=1, x4=None, y1=0, y2='no', y3=True),
 Row(x1='b', x2='orange', x3=2, x4=3.5, y1=1, y2='no', y3=False),
 Row(x1='b', x2='orange', x3=2, x4=1.4, y1=0, y2=None, y3=None),
 Row(x1=None, x2='peach', x3=2, x4=2.1, y1=0, y2=None, y3=False),
 Row(x1='c', x2=None, x3=4, x4=1.5, y1=1, y2='yes', y3=True),
 Row(x1='d', x2='apple', x3=1, x4=3.0, y1=1, y2='', y3=False),
 Row(x1='d', x2='orange', x3=2, x4=2.0, y1=0, y2='yes', y3=False),
 Row(x1='a', x2='apple', x3=1, x4=2.4, y1=1, y2='yes', y3=True),
 Row(x1='a', x2=None, x3=1, x4=None, y1=0, y2='no', y3=True),
 Row(x1='b', x2='orange', x3=2, x4=3.5, y1=1, y2='no', y3=False),
 Row(x1='b', x2='orange', x3=2, x4=1.4, y1=0, y2=None, y3=None),
 Row(x1=None, x2='peach', x3=2, x4=2.1, y1=0, y2=None, y3=False),
 Row(x1='c', x2=None, x3=4, x4=1.5, y1=1, y2='yes', y3=True),
 Row(x1='d', x2='apple', x3=1, x4=3.0, y1=1, y2='', y3=False),
 Row(x1='d', x2='orange', x3=2, x4=

In [None]:
# How many rows and columns
pdf_sc.columns
print("\n------\n")
len(pdf_sc.columns)
print("\n------\n")
pdf_sc.count()

['x1', 'x2', 'x3', 'x4', 'y1', 'y2', 'y3']


------



7


------



80

In [None]:
pdf[['x1','x2']].head()

Unnamed: 0,x1,x2
0,a,apple
1,a,
2,b,orange
3,b,orange
4,,peach


In [None]:
#in pandas
pdf.iloc[:, 2:5].head()

Unnamed: 0,x3,x4,y1
0,1,2.4,1
1,1,,0
2,2,3.5,1
3,2,1.4,0
4,2,2.1,0


In [None]:
# Show specific columns and rows:
pdf_sc.select('x1', 'x2').show(3)
print("\n------\n")
pdf_sc.select(pdf_sc.columns[2:4]).show(3)

+---+------+
| x1|    x2|
+---+------+
|  a| apple|
|  a|  null|
|  b|orange|
+---+------+
only showing top 3 rows


------

+---+----+---+---+-----+
| x3|  x4| y1| y2|   y3|
+---+----+---+---+-----+
|  1| 2.4|  1|yes| true|
|  1|null|  0| no| true|
|  2| 3.5|  1| no|false|
+---+----+---+---+-----+
only showing top 3 rows



In [None]:
f=pdf_sc.columns[2:4]

In [None]:
f.append('y3')

In [None]:
f

['x3', 'x4', 'y3']

In [None]:
pdf_sc.describe().show()

+-------+----+-----+------------------+------------------+------------------+----+
|summary|  x1|   x2|                x3|                x4|                y1|  y2|
+-------+----+-----+------------------+------------------+------------------+----+
|  count|  70|   60|                80|                70|                80|  60|
|   mean|null| null|             1.875|2.2714285714285714|               0.5|null|
| stddev|null| null|0.9328736058850428|0.7136643259868996|0.5031546054266275|null|
|    min|   a|apple|                 1|               1.4|                 0|    |
|    max|   d|peach|                 4|               3.5|                 1| yes|
+-------+----+-----+------------------+------------------+------------------+----+



In [None]:
pdf_sc.dtypes

[('x1', 'string'),
 ('x2', 'string'),
 ('x3', 'bigint'),
 ('x4', 'double'),
 ('y1', 'bigint'),
 ('y2', 'string'),
 ('y3', 'boolean')]

In [None]:
pdf_sc.schema

StructType([StructField('x1', StringType(), True), StructField('x2', StringType(), True), StructField('x3', LongType(), False), StructField('x4', DoubleType(), True), StructField('y1', LongType(), False), StructField('y2', StringType(), True), StructField('y3', BooleanType(), True)])

See [here](https://sparkbyexamples.com/pyspark/pyspark-where-filter/)

In [None]:
#here we can use both .where /.filter

In [None]:
# &, df.x3.isin(list)

#SQL expression
pdf_sc.select("x3").where("x3 > 2.0").show()

#
pdf_sc.select("x3").where(pdf_sc.x3 > 2.0).show()

+---+
| x3|
+---+
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
+---+

+---+
| x3|
+---+
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
|  4|
+---+



In [None]:
#other method using filteration
pdf_sc.select("x3").where(pdf_sc.x3.isin(1.0,2.0)).show()

+---+
| x3|
+---+
|  1|
|  1|
|  2|
|  2|
|  2|
|  1|
|  2|
|  1|
|  1|
|  2|
|  2|
|  2|
|  1|
|  2|
|  1|
|  1|
|  2|
|  2|
|  2|
|  1|
+---+
only showing top 20 rows



In [None]:
pdf.head()

Unnamed: 0,x1,x2,x3,x4,y1,y2,y3
0,a,apple,1,2.4,1,yes,True
1,a,,1,,0,no,True
2,b,orange,2,3.5,1,no,False
3,b,orange,2,1.4,0,,
4,,peach,2,2.1,0,,False


In [None]:
#SELECT id, sum(quantity) FROM dealer GROUP BY id ORDER BY id;

In [None]:
pdf_sc.groupBy('x1').agg("x3" "min","y1":"max").show()

SyntaxError: ignored

In [None]:
# Create a pyspark pandas DataFrame
#     Note how nulls have been inserted.
#     As np.nan and None

pdf = ps.DataFrame(
                   {
                      'x1': ['a','a','b','b',np.nan, 'c', 'd','d'] * 10,
                      'x2': ['apple',np.nan, 'orange','orange', 'peach',np.nan,'apple','orange'] * 10,
                      'x3': [1, 1, 2, 2, 2, 4, 1, 2] *10,
                      'x4': [2.4, None, 3.5, 1.4, 2.1,1.5, 3.0, 2.0] *10,
                      'y1': [1, 0, 1, 0, 0, 1, 1, 0] * 10,
                      'y2': ['yes', 'no', 'no',np.nan,None, 'yes','', 'yes'] *10,
                      'y3': [True, True, False, None, False, True, False,False] *10
                    }
                   )

  fields = [
  for column, series in pdf.iteritems():


In [None]:
uxt = xyz.to_spark()

NameError: ignored

In [None]:
pdf_sc.createOrReplaceTempView("abc")



In [None]:
spark.sql("select * from abc").show()
spark.sql("select * from abc where  x1 ='d' ").show()
spark.sql("select * from abc where  x3>2 ").show()
spark.sql("select distinct(x1) from abc ") .show()
spark.sql("select x1,min(x3) from abc group by x1").show
spark.sql("select x1,min(x3) from abc group by x1").show

+----+------+---+----+---+----+-----+
|  x1|    x2| x3|  x4| y1|  y2|   y3|
+----+------+---+----+---+----+-----+
|   a| apple|  1| 2.4|  1| yes| true|
|   a|  null|  1|null|  0|  no| true|
|   b|orange|  2| 3.5|  1|  no|false|
|   b|orange|  2| 1.4|  0|null| null|
|null| peach|  2| 2.1|  0|null|false|
|   c|  null|  4| 1.5|  1| yes| true|
|   d| apple|  1| 3.0|  1|    |false|
|   d|orange|  2| 2.0|  0| yes|false|
|   a| apple|  1| 2.4|  1| yes| true|
|   a|  null|  1|null|  0|  no| true|
|   b|orange|  2| 3.5|  1|  no|false|
|   b|orange|  2| 1.4|  0|null| null|
|null| peach|  2| 2.1|  0|null|false|
|   c|  null|  4| 1.5|  1| yes| true|
|   d| apple|  1| 3.0|  1|    |false|
|   d|orange|  2| 2.0|  0| yes|false|
|   a| apple|  1| 2.4|  1| yes| true|
|   a|  null|  1|null|  0|  no| true|
|   b|orange|  2| 3.5|  1|  no|false|
|   b|orange|  2| 1.4|  0|null| null|
+----+------+---+----+---+----+-----+
only showing top 20 rows

+---+------+---+---+---+---+-----+
| x1|    x2| x3| x4| y1| y2

<bound method DataFrame.show of DataFrame[x1: string, min(x3): bigint]>

<bound method DataFrame.show of DataFrame[x1: string, min(x3): bigint]>

In [None]:
spark.sql("select abc.x1 as x1, cde.x1 as xz1 from abc a,cde f where a.x3 =f.x3").show()

In [None]:
spark.sql("select abc.x1, cde.x2 from abc,cde where abc.x3 =cde.x3").show()

AnalysisException: ignored

In [None]:
spark.sql("select * from abc where x3 > 1 and y1 = 1").show()

In [None]:
spark.sql("select abc.*,cde.* from abc,cde where abc.x1=cde.x1").show()