## **Selecting and Renaming Columns in SparkDataFrame**

In [1]:
import findspark


findspark.init()
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

spark

In [2]:
from pyspark.sql.functions import explode_outer
from pyspark.sql import Row
import datetime
import pandas as pd

In [3]:
usr_Struct=[
    {'id':1,
     'name':"asfand",
     'last': "saeed",
     'is_customer':True,
     'phone': Row(first="+92329677783", Second="+92342454672"),
     'update': datetime.datetime(2022,4,12,4,10,0)
    },
    {'id':2,
     'name':"saeed",
     'last': "asfand",
     'is_customer':True,
     'phone': Row(first="+92329622283", Second="+92342442312"),
     'update': datetime.datetime(2021,4,11,4,10,0)},
    {'id':3,
     'name':"ali",
     'last':"khan",
     'is_customer':False,
     'phone': Row(first="+92329644483", Second="+92342445552"), ## Predefined structure as a Row
     'update': datetime.datetime(2021,4,22,12,10,0)},
    {'id':4,
     'name':"khan",
     'last':"syed",
     'is_customer':False,
     'phone': Row(first=None,Second=None),#['+92329633456','+9234247654'],
     'update': None #datetime.datetime(2022,5,1,10,10,0)}
    }
]

In [4]:
spp1=spark.createDataFrame(pd.DataFrame(usr_Struct))

In [5]:
spp1.show(truncate=False)

+---+------+------+-----------+----------------------------+-------------------+
|id |name  |last  |is_customer|phone                       |update             |
+---+------+------+-----------+----------------------------+-------------------+
|1  |asfand|saeed |true       |{+92329677783, +92342454672}|2022-04-12 04:10:00|
|2  |saeed |asfand|true       |{+92329622283, +92342442312}|2021-04-11 04:10:00|
|3  |ali   |khan  |false      |{+92329644483, +92342445552}|2021-04-22 12:10:00|
|4  |khan  |syed  |false      |{null, null}                |null               |
+---+------+------+-----------+----------------------------+-------------------+



In [12]:
help(spp1.select)

Help on method select in module pyspark.sql.dataframe:

select(*cols) method of pyspark.sql.dataframe.DataFrame instance
    Projects a set of expressions and returns a new :class:`DataFrame`.
    
    .. versionadded:: 1.3.0
    
    Parameters
    ----------
    cols : str, :class:`Column`, or list
        column names (string) or expressions (:class:`Column`).
        If one of the column names is '*', that column is expanded to include all columns
        in the current :class:`DataFrame`.
    
    Examples
    --------
    >>> df.select('*').collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.select('name', 'age').collect()
    [Row(name='Alice', age=2), Row(name='Bob', age=5)]
    >>> df.select(df.name, (df.age + 10).alias('age')).collect()
    [Row(name='Alice', age=12), Row(name='Bob', age=15)]



In [59]:
spp1.select ('*').show(truncate=False)

+---+------+------+-----------+----------------------------+-------------------+
|id |name  |last  |is_customer|phone                       |update             |
+---+------+------+-----------+----------------------------+-------------------+
|1  |asfand|saeed |true       |{+92329677783, +92342454672}|2022-04-12 04:10:00|
|2  |saeed |asfand|true       |{+92329622283, +92342442312}|2021-04-11 04:10:00|
|3  |ali   |khan  |false      |{+92329644483, +92342445552}|2021-04-22 12:10:00|
|4  |khan  |syed  |false      |{null, null}                |null               |
+---+------+------+-----------+----------------------------+-------------------+



In [60]:
spp1.select ('*',col('phone.first')).show()

+---+------+------+-----------+--------------------+-------------------+------------+
| id|  name|  last|is_customer|               phone|             update|       first|
+---+------+------+-----------+--------------------+-------------------+------------+
|  1|asfand| saeed|       true|{+92329677783, +9...|2022-04-12 04:10:00|+92329677783|
|  2| saeed|asfand|       true|{+92329622283, +9...|2021-04-11 04:10:00|+92329622283|
|  3|   ali|  khan|      false|{+92329644483, +9...|2021-04-22 12:10:00|+92329644483|
|  4|  khan|  syed|      false|        {null, null}|               null|        null|
+---+------+------+-----------+--------------------+-------------------+------------+



In [61]:
spp1.alias('u').select('u.*').show()

+---+------+------+-----------+--------------------+-------------------+
| id|  name|  last|is_customer|               phone|             update|
+---+------+------+-----------+--------------------+-------------------+
|  1|asfand| saeed|       true|{+92329677783, +9...|2022-04-12 04:10:00|
|  2| saeed|asfand|       true|{+92329622283, +9...|2021-04-11 04:10:00|
|  3|   ali|  khan|      false|{+92329644483, +9...|2021-04-22 12:10:00|
|  4|  khan|  syed|      false|        {null, null}|               null|
+---+------+------+-----------+--------------------+-------------------+



In [62]:
## col is used when you have to apply function on columns
spp1.alias('u').select(['u.id','u.name',col('is_customer')]).show()

+---+------+-----------+
| id|  name|is_customer|
+---+------+-----------+
|  1|asfand|       true|
|  2| saeed|       true|
|  3|   ali|      false|
|  4|  khan|      false|
+---+------+-----------+



In [64]:
from pyspark.sql.functions import col,lit,concat

In [67]:
spp1.alias('u').select (concat (col('name'),lit(', '),col('last')).alias('full_name')).show()

+-------------+
|    full_name|
+-------------+
|asfand, saeed|
|saeed, asfand|
|    ali, khan|
|   khan, syed|
+-------------+



### **SelectExpr on Spark Data Frame**

In [69]:
help(spp1.selectExpr)

Help on method selectExpr in module pyspark.sql.dataframe:

selectExpr(*expr) method of pyspark.sql.dataframe.DataFrame instance
    Projects a set of SQL expressions and returns a new :class:`DataFrame`.
    
    This is a variant of :func:`select` that accepts SQL expressions.
    
    .. versionadded:: 1.3.0
    
    Examples
    --------
    >>> df.selectExpr("age * 2", "abs(age)").collect()
    [Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]



In [71]:
## We use the 
spp1.alias('u').selectExpr('u.*').show(truncate=False)



+---+------+------+-----------+----------------------------+-------------------+
|id |name  |last  |is_customer|phone                       |update             |
+---+------+------+-----------+----------------------------+-------------------+
|1  |asfand|saeed |true       |{+92329677783, +92342454672}|2022-04-12 04:10:00|
|2  |saeed |asfand|true       |{+92329622283, +92342442312}|2021-04-11 04:10:00|
|3  |ali   |khan  |false      |{+92329644483, +92342445552}|2021-04-22 12:10:00|
|4  |khan  |syed  |false      |{null, null}                |null               |
+---+------+------+-----------+----------------------------+-------------------+



In [72]:
from pyspark.sql.functions import col,lit,concat

In [82]:
spp1.alias('u').selectExpr('id',"concat(name, ', ',last ) as full_name").show()

+---+-------------+
| id|    full_name|
+---+-------------+
|  1|asfand, saeed|
|  2|saeed, asfand|
|  3|    ali, khan|
|  4|   khan, syed|
+---+-------------+



In [83]:
spp1.createOrReplaceTempView('users')

In [86]:
spark.sql("""
          select id,concat(name, ', ' ,last ) as full_name from users
          
          """
          ).\
              show()

+---+-------------+
| id|    full_name|
+---+-------------+
|  1|asfand, saeed|
|  2|saeed, asfand|
|  3|    ali, khan|
|  4|   khan, syed|
+---+-------------+



### **Columns Using Spark Data Frame Names**
* spp1['id'] and col('id') will return column type object.
* We can't use col() as selectExpr, We can pass strings only we cant pass col type objects as we did in past.

In [89]:
spp1['id']

Column<'id'>

In [90]:
col('id')

Column<'id'>

In [91]:
type(spp1['id'])

pyspark.sql.column.Column

In [88]:
spp1.select(spp1['id']).show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+



In [92]:
spp1.select(spp1['id'],col('name')).show()

+---+------+
| id|  name|
+---+------+
|  1|asfand|
|  2| saeed|
|  3|   ali|
|  4|  khan|
+---+------+



In [96]:
## 
spp1.alias('u').select(u['id']).show()

NameError: name 'u' is not defined

In [97]:
spp1.alias('u').select('u.id',col('name'),'last').show()

+---+------+------+
| id|  name|  last|
+---+------+------+
|  1|asfand| saeed|
|  2| saeed|asfand|
|  3|   ali|  khan|
|  4|  khan|  syed|
+---+------+------+



In [None]:
spp1.selectExpr(col('id'),'name').show()

In [103]:
spp1.alias('u').selectExpr('u.id',"concat(u.name, ', ',u.last) as full").show()

+---+-------------+
| id|         full|
+---+-------------+
|  1|asfand, saeed|
|  2|saeed, asfand|
|  3|    ali, khan|
|  4|   khan, syed|
+---+-------------+



In [104]:
spp1.createOrReplaceTempView('users')

In [107]:
spark.sql('''
          select id,concat(u.name,', ', u.last) as full_name
          from users as u
          '''
).show()

+---+-------------+
| id|    full_name|
+---+-------------+
|  1|asfand, saeed|
|  2|saeed, asfand|
|  3|    ali, khan|
|  4|   khan, syed|
+---+-------------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 64191)
Traceback (most recent call last):
  File "c:\Users\Asfandyar\AppData\Local\Programs\Python\Python310\lib\socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "c:\Users\Asfandyar\AppData\Local\Programs\Python\Python310\lib\socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "c:\Users\Asfandyar\AppData\Local\Programs\Python\Python310\lib\socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "c:\Users\Asfandyar\AppData\Local\Programs\Python\Python310\lib\socketserver.py", line 747, in __init__
    self.handle()
  File "c:\Users\Asfandyar\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\accumulators.py", line 262, in handle
    poll(accum_updates)
  File "c:\Users\Asfandyar\AppData\Local\Progr