In [1]:
import datetime
import jdatetime
from pyspark import Row, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

from c2cbet import config
from pyspark.sql import functions
from dputils import pyspark_funcs

In [2]:
spark = SparkSession \
    .builder.appName(config.appName) \
    .master(config.master) \
    .config("spark.sql.warehouse.dir", config.spark_sql_warehouse_dir, SparkConf()) \
    .getOrCreate()

In [3]:
# df = spark.createDataFrame(
#     [(1, "foo"),  # create your data here, be consistent in the types.
#      (2, "bar"),],
#     ["id", "label"]  # add your column names here
# )
data = [('James','Smith','M',3000,1646822155), ('Anna','Rose','F',4100,1646722155),
        ('Robert','Williams','M',6200,1646622155)
        ]
columns = ["firstname","lastname","gender","salary","creationDate"]
df = spark.createDataFrame(data=data, schema = columns)

df.printSchema()
df.show()

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- creationDate: long (nullable = true)

+---------+--------+------+------+------------+
|firstname|lastname|gender|salary|creationDate|
+---------+--------+------+------+------------+
|    James|   Smith|     M|  3000|  1646822155|
|     Anna|    Rose|     F|  4100|  1646722155|
|   Robert|Williams|     M|  6200|  1646622155|
+---------+--------+------+------+------------+



In [4]:
#How to add a constant column in a Spark DataFrame?
# In this PySpark article, I will explain different ways of how to add a new column to DataFrame
# using withColumn(), select(), sql(), Few ways include adding a constant column with a default value,
# derive based out of another column, add a column with NULL/None value, add multiple columns

print(">>> Add new constanct column")
from pyspark.sql.functions import lit
df.withColumn("bonus_percent", lit(0.3)) \
    .show()

>>> Add new constanct column
+---------+--------+------+------+------------+-------------+
|firstname|lastname|gender|salary|creationDate|bonus_percent|
+---------+--------+------+------+------------+-------------+
|    James|   Smith|     M|  3000|  1646822155|          0.3|
|     Anna|    Rose|     F|  4100|  1646722155|          0.3|
|   Robert|Williams|     M|  6200|  1646622155|          0.3|
+---------+--------+------+------+------------+-------------+



In [5]:
print(">>> Add New column with NULL")
df.withColumn("DEFAULT_COL", lit(None)) \
    .show()

# def to_jdate()

>>> Add New column with NULL
+---------+--------+------+------+------------+-----------+
|firstname|lastname|gender|salary|creationDate|DEFAULT_COL|
+---------+--------+------+------+------------+-----------+
|    James|   Smith|     M|  3000|  1646822155|       null|
|     Anna|    Rose|     F|  4100|  1646722155|       null|
|   Robert|Williams|     M|  6200|  1646622155|       null|
+---------+--------+------+------+------------+-----------+



In [16]:
print(">>> Add column from existing column")
# jfrom_unixtime=functions.udf(lambda ts: jdatetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') , StringType() )
df.withColumn("bonus_amount", df.salary*0.3) \
    .withColumn("creationDate_d", functions.from_unixtime(df.creationDate)) \
    .withColumn("creationDate_j", pyspark_funcs.jfrom_unixtime(df.creationDate)) \
    .show()

>>> Add column from existing column
+---------+--------+------+------+------------+------------+-------------------+-------------------+
|firstname|lastname|gender|salary|creationDate|bonus_amount|     creationDate_d|     creationDate_j|
+---------+--------+------+------+------------+------------+-------------------+-------------------+
|    James|   Smith|     M|  3000|  1646822155|       900.0|2022-03-09 14:05:55|1400-12-18 14:05:55|
|     Anna|    Rose|     F|  4100|  1646722155|      1230.0|2022-03-08 10:19:15|1400-12-17 10:19:15|
|   Robert|Williams|     M|  6200|  1646622155|      1860.0|2022-03-07 06:32:35|1400-12-16 06:32:35|
+---------+--------+------+------+------------+------------+-------------------+-------------------+



In [None]:
print(">>> Add column by concatinating existing columns")
from pyspark.sql.functions import concat_ws
df.withColumn("name", concat_ws(",","firstname",'lastname')) \
    .show()


In [None]:
print(">>> Add Column Value Based on Condition")
from pyspark.sql.functions import when
df.withColumn("grade",
              when((df.salary < 4000), lit("A"))
              .when((df.salary >= 4000) & (df.salary <= 5000), lit("B"))
              .otherwise(lit("C"))
              ).show()

In [None]:
print(">>> Add Column When not Exists on DataFrame")
if 'dummy' not in df.columns:
    df.withColumn("dummy",lit(None)) \
        .show()

In [None]:
print(">>>  Add Multiple Columns using Map")
# Let's assume DF has just 3 columns c1,c2,c3
# apply transformation on these columns and derive multiple columns
# and store these column vlaues into c5,c6,c7,c8,c9,10

# df2 = df.rdd.map(row=>{(c1,c2,c5,c6,c7,c8,c9,c10)})

In [3]:
spark.sql("select channel,count(*) from c2c_bet group by channel").show()



AnalysisException: Table or view not found: c2c_bet; line 1 pos 29;
'Aggregate ['channel], ['channel, unresolvedalias(count(1), None)]
+- 'UnresolvedRelation [c2c_bet], [], false
