In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pysparkseries').getOrCreate()
columns = ["Seqno","Quote","id"]
data = [("1", "Be the change that you wish to see in the world",1),
    ("2", "Everyone thinks of changing the world, but no one thinks of changing himself.",2),
    ("3", "The purpose of our lives is to be happy.",3),
    ("4", "Be cool.",4)]
df1 = spark.createDataFrame(data,columns)
df1.show()

+-----+--------------------+---+
|Seqno|               Quote| id|
+-----+--------------------+---+
|    1|Be the change tha...|  1|
|    2|Everyone thinks o...|  2|
|    3|The purpose of ou...|  3|
|    4|            Be cool.|  4|
+-----+--------------------+---+



In [0]:
df1.show(4,truncate=False)

+-----+-----------------------------------------------------------------------------+---+
|Seqno|Quote                                                                        |id |
+-----+-----------------------------------------------------------------------------+---+
|1    |Be the change that you wish to see in the world                              |1  |
|2    |Everyone thinks of changing the world, but no one thinks of changing himself.|2  |
|3    |The purpose of our lives is to be happy.                                     |3  |
|4    |Be cool.                                                                     |4  |
+-----+-----------------------------------------------------------------------------+---+



In [0]:
l = [(1,'rama'),(2,None),(3,'sai')]
headers = ["cid","cname"]
df = spark.createDataFrame(l,headers)
display(df)

cid,cname
1,rama
2,
3,sai


In [0]:
df_null = df.where('cname is null')
df_null.show()

+---+-----+
|cid|cname|
+---+-----+
|  2| null|
+---+-----+



In [0]:
df_null = df.where('cname is not null')
df_null.show()

+---+-----+
|cid|cname|
+---+-----+
|  1| rama|
|  3|  sai|
+---+-----+



In [0]:
from pyspark.sql.functions import col
df_null = df.where(col('cname').isNull())
df_null.show()

+---+-----+
|cid|cname|
+---+-----+
|  2| null|
+---+-----+



In [0]:
df_null = df.where(col('cname').isNotNull())
df_null.show()

+---+-----+
|cid|cname|
+---+-----+
|  1| rama|
|  3|  sai|
+---+-----+



In [0]:
dbutils.fs.mounts()

Out[7]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType='sse-s3'),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType='sse-s3'),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType='sse-s3'),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType='sse-s3')]

In [0]:
spark_df = spark.read.csv('dbfs:/FileStore/Credit_Pyspark.csv',header=True)
spark_df.show()

+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        1|   Master|       Abhi|         978|
|        1|     Visa|       Abhi|         978|
|        2|   Master|       Rama|         770|
|        2|     Visa|       Rama|        1240|
|        3|   Master|    Krishna|        1140|
|        3|     Visa|    Krishna|        1140|
|        6|   Master|      Srinu|        1240|
|        7|     Visa|      Srinu|        1240|
+---------+---------+-----------+------------+



In [0]:
file_path = 'dbfs:/FileStore/Credit_Pyspark.csv'
file_format = 'csv'
first_row_header = True
inferSchema = True
df_csv = spark.read.format(file_format).option("header",first_row_header).option("inferSchema",inferSchema).load(file_path)
df_csv.show()

+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        1|   Master|       Abhi|         978|
|        1|     Visa|       Abhi|         978|
|        2|   Master|       Rama|         770|
|        2|     Visa|       Rama|        1240|
|        3|   Master|    Krishna|        1140|
|        3|     Visa|    Krishna|        1140|
|        6|   Master|      Srinu|        1240|
|        7|     Visa|      Srinu|        1240|
+---------+---------+-----------+------------+



In [0]:
#To get first index value
from pyspark.sql.functions import col

l = df_csv.agg({"Credit_Score":"min"}).first()[0]
#print(l)
#To print entire row
df_result = df_csv.filter(col('Credit_Score')==l)
df_result.show()

Object `print(l)` not found.
+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        2|   Master|       Rama|         770|
+---------+---------+-----------+------------+



In [0]:
# group by

#grouping on Card_type with credit Score
df_csv.groupBy("Card_Type").agg({"Credit_Score":"collect_list"}).show(truncate=False)

+---------+--------------------------+
|Card_Type|collect_list(Credit_Score)|
+---------+--------------------------+
|Visa     |[978, 1240, 1140, 1240]   |
|Master   |[978, 770, 1140, 1240]    |
+---------+--------------------------+



In [0]:
#Collect Set Remove the duplicates while grouping
df_csv.groupBy("Card_Type").agg({"Credit_Score":"collect_set"}).show(truncate=False)

+---------+-------------------------+
|Card_Type|collect_set(Credit_Score)|
+---------+-------------------------+
|Visa     |[978, 1240, 1140]        |
|Master   |[978, 1240, 1140, 770]   |
+---------+-------------------------+



In [0]:
import pyspark.sql.functions as F
df_csv.groupBy("Card_Type").agg(F.collect_list("Credit_Score").alias("Creditscore")).show(truncate = False)

+---------+-----------------------+
|Card_Type|Creditscore            |
+---------+-----------------------+
|Visa     |[978, 1240, 1140, 1240]|
|Master   |[978, 770, 1140, 1240] |
+---------+-----------------------+



In [0]:
# to get maximum value

first_row = df_csv.agg({"Credit_Score":"max"}).first()[0]

#to get all record of maximum credit score
df_csv.filter(col("Credit_Score")==first_row).show()

+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        2|     Visa|       Rama|        1240|
|        6|   Master|      Srinu|        1240|
|        7|     Visa|      Srinu|        1240|
+---------+---------+-----------+------------+



In [0]:
import pyspark.sql.functions as f
df_csv.groupBy("Card_Type").agg(F.collect_set("Credit_Score")).show(truncate=False)

+---------+-------------------------+
|Card_Type|collect_set(Credit_Score)|
+---------+-------------------------+
|Visa     |[978, 1240, 1140]        |
|Master   |[978, 1240, 1140, 770]   |
+---------+-------------------------+



In [0]:
#Count approximate Distinct number
df_csv.groupBy("Card_Type").agg({"credit_score":"approx_count_distinct"}).show()

+---------+-----------------------------------+
|Card_Type|approx_count_distinct(credit_score)|
+---------+-----------------------------------+
|     Visa|                                  3|
|   Master|                                  4|
+---------+-----------------------------------+



In [0]:
import pyspark.sql.functions as F

df_csv.groupBy("Card_Type").agg(F.approx_count_distinct("Credit_Score").alias("Creditscore")).show(truncate=False)

+---------+-----------+
|Card_Type|Creditscore|
+---------+-----------+
|Visa     |3          |
|Master   |4          |
+---------+-----------+



In [0]:
#sum of all credit score

df_csv.groupBy("Card_Type").agg({"Credit_Score":"sum"}).show()

+---------+-----------------+
|Card_Type|sum(Credit_Score)|
+---------+-----------------+
|     Visa|             4598|
|   Master|             4128|
+---------+-----------------+



In [0]:
import pyspark.sql.functions as f

df_csv.groupBy("Card_Type").agg(F.sum("Credit_Score").alias("Total_Credit_Score")).show(truncate=False)

+---------+------------------+
|Card_Type|Total_Credit_Score|
+---------+------------------+
|Visa     |4598              |
|Master   |4128              |
+---------+------------------+



In [0]:
import pyspark.sql.functions as F

df_csv.groupBy('Card_Type').agg(F.sum_distinct("Credit_Score").alias("Distinct_Total_Credit_Score")).show(truncate=False)

+---------+---------------------------+
|Card_Type|Distinct_Total_Credit_Score|
+---------+---------------------------+
|Visa     |3358                       |
|Master   |4128                       |
+---------+---------------------------+



In [0]:
df_csv.groupBy("Card_Type").agg({"Credit_Score":"avg"}).show()

+---------+-----------------+
|Card_Type|avg(Credit_Score)|
+---------+-----------------+
|     Visa|           1149.5|
|   Master|           1032.0|
+---------+-----------------+



In [0]:
import pyspark.sql.functions as F

df_csv.groupBy("Card_Type").agg(F.avg("Credit_Score").alias("Average_Credit_Score")).show(truncate=False)

+---------+--------------------+
|Card_Type|Average_Credit_Score|
+---------+--------------------+
|Visa     |1149.5              |
|Master   |1032.0              |
+---------+--------------------+



In [0]:
# count the card type on credit Score

df_csv.groupBy("Card_Type").agg({"Credit_Score":"count"}).show()

+---------+-------------------+
|Card_Type|count(Credit_Score)|
+---------+-------------------+
|     Visa|                  4|
|   Master|                  4|
+---------+-------------------+



In [0]:
import pyspark.sql.functions as F

df_csv.groupBy("Card_Type").agg(F.count("Credit_Score").alias("Count of Credit Score")).show(truncate = False)

+---------+---------------------+
|Card_Type|Count of Credit Score|
+---------+---------------------+
|Visa     |4                    |
|Master   |4                    |
+---------+---------------------+



In [0]:
df_csv.filter("Credit_Name like 'A%'").show()

+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        1|   Master|       Abhi|         978|
|        1|     Visa|       Abhi|         978|
+---------+---------+-----------+------------+



In [0]:
df_csv.show()

+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        1|   Master|       Abhi|         978|
|        1|     Visa|       Abhi|         978|
|        2|   Master|       Rama|         770|
|        2|     Visa|       Rama|        1240|
|        3|   Master|    Krishna|        1140|
|        3|     Visa|    Krishna|        1140|
|        6|   Master|      Srinu|        1240|
|        7|     Visa|      Srinu|        1240|
+---------+---------+-----------+------------+



In [0]:
from pyspark.sql.functions import col

df_csv.filter(col("Credit_Name").like('A%')).show()

+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        1|   Master|       Abhi|         978|
|        1|     Visa|       Abhi|         978|
+---------+---------+-----------+------------+



In [0]:
# Between operator

df_csv.filter("Credit_Score between 700 and 1000").show()

+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        1|   Master|       Abhi|         978|
|        1|     Visa|       Abhi|         978|
|        2|   Master|       Rama|         770|
+---------+---------+-----------+------------+



In [0]:
from pyspark.sql.functions import col
import pyspark.sql.functions as f

df_csv.filter(col("Credit_Score").between(700,1000)).orderBy("Credit_Score",ascending=True).show()

+---------+---------+-----------+------------+
|Credit_ID|Card_Type|Credit_Name|Credit_Score|
+---------+---------+-----------+------------+
|        2|   Master|       Rama|         770|
|        1|   Master|       Abhi|         978|
|        1|     Visa|       Abhi|         978|
+---------+---------+-----------+------------+



In [0]:
df1 = spark.createDataFrame([('rama','india'),('sai','india'),('rama','india'),('sai','india')],['cname','ccountry'])
df2 = spark.createDataFrame([('david','usa'),('roshan','usa')],['cname','ccountry'])
df3 = spark.createDataFrame([('john','uk'),('miller','uk')],['cname','ccountry'])

In [0]:
print(df1.show())
print(df2.show())
print(df3.show())

+-----+--------+
|cname|ccountry|
+-----+--------+
| rama|   india|
|  sai|   india|
| rama|   india|
|  sai|   india|
+-----+--------+

None
+------+--------+
| cname|ccountry|
+------+--------+
| david|     usa|
|roshan|     usa|
+------+--------+

None
+------+--------+
| cname|ccountry|
+------+--------+
|  john|      uk|
|miller|      uk|
+------+--------+

None


In [0]:
#union operation

final_df =df1.union(df2).union(df3)
final_df.show()

+------+--------+
| cname|ccountry|
+------+--------+
|  rama|   india|
|   sai|   india|
|  rama|   india|
|   sai|   india|
| david|     usa|
|roshan|     usa|
|  john|      uk|
|miller|      uk|
+------+--------+



In [0]:
#reduce the operation, to bypass the again union operation we use this

from pyspark.sql import DataFrame
from functools import reduce

df_lst = [df1,df2,df3]

final_df1 = reduce(DataFrame.unionAll,df_lst)
final_df1.show()

+------+--------+
| cname|ccountry|
+------+--------+
|  rama|   india|
|   sai|   india|
|  rama|   india|
|   sai|   india|
| david|     usa|
|roshan|     usa|
|  john|      uk|
|miller|      uk|
+------+--------+



In [0]:
#reduce the operation, to bypass the again union operation we use this

from pyspark.sql import DataFrame
from functools import reduce

df_lst = [df1,df2,df3]

final_df1 = reduce(DataFrame.union,df_lst)
final_df1.show()

+------+--------+
| cname|ccountry|
+------+--------+
|  rama|   india|
|   sai|   india|
|  rama|   india|
|   sai|   india|
| david|     usa|
|roshan|     usa|
|  john|      uk|
|miller|      uk|
+------+--------+



In [0]:
from pyspark.sql.functions import col

df_null = df.filter(col('cname').isNull())
df_null.show()

+---+-----+
|cid|cname|
+---+-----+
|  2| null|
+---+-----+



In [0]:
df_not

+---+-----+
|cid|cname|
+---+-----+
|  1| rama|
|  2| null|
|  3|  sai|
+---+-----+

