## SparkSession ##

In [1]:
import findspark
findspark.init("D:/spark-3.0.3-bin-hadoop2.7/")

In [2]:
from pyspark.sql import SparkSession


In [3]:
spark = SparkSession.builder.appName("pyspark").master('local').getOrCreate()

In [4]:
spark

In [5]:
spark2 = SparkSession.newSession

In [6]:
spark2

<function pyspark.sql.session.SparkSession.newSession(self)>

In [7]:
print(spark,spark2)

<pyspark.sql.session.SparkSession object at 0x000001F797F41A50> <function SparkSession.newSession at 0x000001F797F19F30>


In [8]:
spark3 = SparkSession.builder.getOrCreate

In [9]:
print(spark)
print(spark2)
print(spark3)

<pyspark.sql.session.SparkSession object at 0x000001F797F41A50>
<function SparkSession.newSession at 0x000001F797F19F30>
<bound method SparkSession.Builder.getOrCreate of <pyspark.sql.session.SparkSession.Builder object at 0x000001F797C16830>>


In [10]:
df = spark.createDataFrame([("Scala", 25000), ("Spark", 35000), ("PHP", 21000)])

In [11]:
df

DataFrame[_1: string, _2: bigint]

In [12]:
df.collect()

[Row(_1='Scala', _2=25000), Row(_1='Spark', _2=35000), Row(_1='PHP', _2=21000)]

In [13]:
df.show()

+-----+-----+
|   _1|   _2|
+-----+-----+
|Scala|25000|
|Spark|35000|
|  PHP|21000|
+-----+-----+



In [14]:
print(spark.catalog.listDatabases())

[Database(name='default', description='default database', locationUri='file:/F:/Data%20science/python/Pyspark/spark-warehouse')]


In [15]:
spark.getActiveSession()

### commonly use SparkSession methods

###### version() – Returns the Spark version where your application is running, probably the Spark version your cluster is configured with.

###### createDataFrame() – This creates a DataFrame from a collection and an RDD

###### getActiveSession() – returns an active Spark session.

###### read() – Returns an instance of DataFrameReader class, this is used to read records from csv, parquet, avro, and more file formats into DataFrame.

###### readStream() – Returns an instance of DataStreamReader class, this is used to read streaming data. that can be used to read streaming data into DataFrame.

###### sparkContext() – Returns a SparkContext.
 
###### sql() – Returns a DataFrame after executing the SQL mentioned.

###### sqlContext() – Returns SQLContext.

###### stop() – Stop the current SparkContext.

###### table() – Returns a DataFrame of a table or view.

###### udf() – Creates a PySpark UDF to use it on DataFrame, Dataset, and SQL.

## sparkContext

In [16]:
spark.version

'3.0.3'

In [17]:
spark.sparkContext

In [18]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [20]:
rdd = spark.sparkContext.parallelize([('java',0),('python',1),('scala',2),('sql',3),('javascript',4),('go',5)])

In [21]:
rdd

ParallelCollectionRDD[13] at readRDDFromFile at PythonRDD.scala:262

In [22]:
print(rdd)

ParallelCollectionRDD[13] at readRDDFromFile at PythonRDD.scala:262


In [23]:
rdd.collect()

[('java', 0),
 ('python', 1),
 ('scala', 2),
 ('sql', 3),
 ('javascript', 4),
 ('go', 5)]

In [29]:
df2 = rdd.toDF(['language','serial_number'])

In [30]:
df2.show()

+----------+-------------+
|  language|serial_number|
+----------+-------------+
|      java|            0|
|    python|            1|
|     scala|            2|
|       sql|            3|
|javascript|            4|
|        go|            5|
+----------+-------------+



In [31]:
rdd1 = spark.sparkContext.range(1,5)

In [32]:
print(rdd1)

PythonRDD[29] at RDD at PythonRDD.scala:53


In [33]:
rdd1.collect()

[1, 2, 3, 4]

In [38]:
rdd2 = spark.sparkContext.parallelize([1,2,3,4,5])
# rdd  = spark.sparkContext.parallelize

In [40]:
rdd2.collect()

[1, 2, 3, 4, 5]

In [41]:
type(rdd1) == type(rdd2)

False

In [43]:
type(rdd2)

pyspark.rdd.RDD

In [46]:
print(spark.sparkContext.applicationId)
print(spark.sparkContext.version)
print(spark.sparkContext.uiWebUrl) 

local-1660039371610
3.0.3
http://host.docker.internal:4040


### when ever we use parallelize method it creates rdd and when we create it with range method it creates pipelined rdd

###  commonly used sparkContext methods

#### accumulator(value[, accum_param]) 
#### broadcast(value) 
#### emptyRDD()
#### getOrCreate() 
#### hadoopFile() 
#### newAPIHadoopFile() 
#### sequenceFile() 
#### setLogLevel() 
#### textFile()
#### union() 
#### wholeTextFiles()

# RDD Concept

In [59]:
rdd4 = spark.sparkContext.textFile("C:/Users/W10/Desktop/salary.txt")

In [51]:
spark.sparkContext.wholeTextFiles("C:/Users/W10/Desktop/salary.txt")

org.apache.spark.api.java.JavaPairRDD@70fdddef

In [54]:
emptRDD = spark.sparkContext.emptyRDD

In [64]:
rdd5 = spark.sparkContext.textFile("C:/Users/W10/Desktop/information.txt",5)

In [65]:
rdd5.getNumPartitions()

5

### sometimes we want to repartition the rdd, for that pyspark provides two methods
### 1. repartition()
### 2. coalesce()

#### repartition shuffle data from all the nodes it is very expensive execution it is also called full shuffle.
#### coalesce() shuffle data from minimum nodes.

In [69]:
rdd6 = rdd5.flatMap(lambda x: x.split(" "))


In [72]:
numrdd = spark.sparkContext.parallelize(range(20),3)

In [73]:
numrdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [79]:
numrdd.map(lambda x: (x,1))


PythonRDD[45] at RDD at PythonRDD.scala:53

In [81]:
rdd7 = numrdd.map(lambda x: x+2)

In [82]:
rdd7.collect()

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]

In [83]:
rdd7.getNumPartitions()

3

In [93]:
rdd8 = spark.sparkContext.parallelize([('jack',1),('mack',2),('sack',5),('jack',6),('jack',7),('lonare',8),('dongre',9),])

In [94]:
rdd9 = rdd8.reduceByKey(lambda x,y: x+y)

In [95]:
rdd9.collect()

[('jack', 14), ('mack', 2), ('sack', 5), ('lonare', 8), ('dongre', 9)]

In [100]:
rdd10 = rdd9.filter(lambda x: x[1]<5)

In [102]:
rdd10.collect()

[('mack', 2)]

In [103]:
rdd10.count()

1

In [104]:
rdd6.count()

59

In [105]:
rdd6.collect()


['collections',
 '',
 'their',
 'rolls',
 'is',
 'to',
 'store',
 'the',
 'data',
 '',
 '',
 'VARRAYS',
 '\tarrays',
 '\tpre-defined',
 'size',
 '--',
 'we',
 'have',
 'to',
 'predefind',
 'the',
 'array',
 'size',
 '\tindex',
 'starts',
 'with',
 '1',
 '\tcannot',
 'delete',
 'element',
 '',
 'nested',
 'table',
 '\tList',
 '\tvariable',
 'size',
 '--',
 'we',
 "don't",
 'need',
 'to',
 'specify',
 'the',
 'size',
 '\tindex',
 'starts',
 'with',
 '1',
 '',
 'associative',
 'arrays',
 '\tindexing',
 'can',
 'be',
 'done',
 'with',
 'string',
 '\tmap',
 '(hashmap)']