In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 62.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=9d8f98fa5fd08a60392694a1020f2f922d53d660d1369dff9750883047e14361
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [5]:
from pyspark.sql import SparkSession
spark =SparkSession.builder.getOrCreate()

In [7]:
#dataframe Creation by row library ---
from datetime import datetime,date
import pandas as pd
from pyspark.sql import Row
df=spark.createDataFrame([ 
    Row(apple=1,banana=2.,c='hello',d=date(2021,2,4),e=datetime(2021,2,12,1)),
    Row(apple=2,banana=3.,c='hello1',d=date(2021,1,4),e=datetime(2021,2,12,0)),
    Row(apple=3,banana=4.,c='hello2',d=date(2021,2,5),e=datetime(2021,2,11,1))
    ])
df


DataFrame[apple: bigint, banana: double, c: string, d: date, e: timestamp]

In [8]:
#dataframe creation by pandas----
pandas_df=pd.DataFrame({
    'apple':[1,2,3],
    'banana':[2.,3.,3.],
    'c':['hello','hello1','hello2'],
    'd':[date(2021,2,4),date(2021,2,4),date(2021,2,4)],
    'e':[datetime(2021,2,12,1),datetime(2021,2,12,1),datetime(2021,2,12,1)]
    })
df=spark.createDataFrame(pandas_df)
df


DataFrame[apple: bigint, banana: double, c: string, d: date, e: timestamp]

In [10]:
#creating dataframe with rdd or parallize method---
rdd=spark.sparkContext.parallelize([
  (1,2.,'hello',date(2021,2,4),datetime(2021,2,12,1)),
  (2,3.,'hello',date(2021,2,4),datetime(2021,2,12,1)),
  (3,3.,'hello',date(2021,2,4),datetime(2021,2,12,1))    
])
df=spark.createDataFrame(rdd,schema=['apple','banana','c','d','e'])
df


DataFrame[apple: bigint, banana: double, c: string, d: date, e: timestamp]

In [13]:
#shows the all values or data in data Frame---
df.show()
df.printSchema()

+-----+------+-----+----------+-------------------+
|apple|banana|    c|         d|                  e|
+-----+------+-----+----------+-------------------+
|    1|   2.0|hello|2021-02-04|2021-02-12 01:00:00|
|    2|   3.0|hello|2021-02-04|2021-02-12 01:00:00|
|    3|   3.0|hello|2021-02-04|2021-02-12 01:00:00|
+-----+------+-----+----------+-------------------+

root
 |-- apple: long (nullable = true)
 |-- banana: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [15]:
#this command show only 2 lines in the dataframe
df.show(2)

+-----+------+-----+----------+-------------------+
|apple|banana|    c|         d|                  e|
+-----+------+-----+----------+-------------------+
|    1|   2.0|hello|2021-02-04|2021-02-12 01:00:00|
|    2|   3.0|hello|2021-02-04|2021-02-12 01:00:00|
+-----+------+-----+----------+-------------------+
only showing top 2 rows



In [18]:
"""this command is useful when rows are too 
long to show horizentally"""
df.show(1,vertical=True)

-RECORD 0---------------------
 apple  | 1                   
 banana | 2.0                 
 c      | hello               
 d      | 2021-02-04          
 e      | 2021-02-12 01:00:00 
only showing top 1 row



In [20]:
df.columns

['apple', 'banana', 'c', 'd', 'e']

In [22]:
#summary of the dataFrame---
df.select('apple','banana','c').describe().show()

+-------+-----+------------------+-----+
|summary|apple|            banana|    c|
+-------+-----+------------------+-----+
|  count|    3|                 3|    3|
|   mean|  2.0|2.6666666666666665| null|
| stddev|  1.0|0.5773502691896257| null|
|    min|    1|               2.0|hello|
|    max|    3|               3.0|hello|
+-------+-----+------------------+-----+



In [24]:
#it will show all local python command to enter the values in dataFrame
df.collect()

[Row(apple=1, banana=2.0, c='hello', d=datetime.date(2021, 2, 4), e=datetime.datetime(2021, 2, 12, 1, 0)),
 Row(apple=2, banana=3.0, c='hello', d=datetime.date(2021, 2, 4), e=datetime.datetime(2021, 2, 12, 1, 0)),
 Row(apple=3, banana=3.0, c='hello', d=datetime.date(2021, 2, 4), e=datetime.datetime(2021, 2, 12, 1, 0))]

In [26]:
#it will show 1 local python command to enter the values in dataFrame
df.take(1)

[Row(apple=1, banana=2.0, c='hello', d=datetime.date(2021, 2, 4), e=datetime.datetime(2021, 2, 12, 1, 0))]

In [29]:
#selecting and accessing the data--
df.to_pandas_on_spark()

Unnamed: 0,apple,banana,c,d,e
0,1,2.0,hello,2021-02-04,2021-02-12 01:00:00
1,2,3.0,hello,2021-02-04,2021-02-12 01:00:00
2,3,3.0,hello,2021-02-04,2021-02-12 01:00:00


In [32]:
from pyspark.sql import Column
from pyspark.sql.functions import upper
type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [33]:
#before change to upper
df.select(df.c).show()

+-----+
|    c|
+-----+
|hello|
|hello|
|hello|
+-----+



In [34]:
#after change to upper .it creates new column to show output.
df.withColumn('upper_c',upper(df.c)).show()

+-----+------+-----+----------+-------------------+-------+
|apple|banana|    c|         d|                  e|upper_c|
+-----+------+-----+----------+-------------------+-------+
|    1|   2.0|hello|2021-02-04|2021-02-12 01:00:00|  HELLO|
|    2|   3.0|hello|2021-02-04|2021-02-12 01:00:00|  HELLO|
|    3|   3.0|hello|2021-02-04|2021-02-12 01:00:00|  HELLO|
+-----+------+-----+----------+-------------------+-------+



In [36]:
#filter
df.filter(df.apple == 1).show()

+-----+------+-----+----------+-------------------+
|apple|banana|    c|         d|                  e|
+-----+------+-----+----------+-------------------+
|    1|   2.0|hello|2021-02-04|2021-02-12 01:00:00|
+-----+------+-----+----------+-------------------+



In [38]:
import pandas
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series:pd.Series) -> pd.Series:
  return series + 1

df.select(pandas_plus_one(df.apple)).show() 

+----------------------+
|pandas_plus_one(apple)|
+----------------------+
|                     2|
|                     3|
|                     4|
+----------------------+

