In [5]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=fa3c531e41c0287d826e20e37aa9ad87a59efedc77001916f767b69af340a6dd
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [6]:

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType,IntegerType,StructType,StructField,FloatType
from pyspark.sql.functions import when, col, udf

In [7]:
spark = SparkSession.builder.appName("exp").getOrCreate()
sc = spark.sparkContext

In [8]:
@udf(returnType=StringType())
def get_english_name(val):
    return val[0:val.index(" (")]

@udf(returnType=IntegerType())
def get_start_year(val):
    return int(val[1:5])

@udf(returnType=StringType())
def get_trend(x):
    if x < -3.00:
        return "strong decline"
    elif -3.00 < x < -0.50:
        return "weak decline"
    elif -0.50 <x<0.50:
        return "no change"
    else:
        return "strong increase"

In [9]:
info = [("Greenfinch (Chloris chloris)","Farmland birds","(1970-2014)",-1.13),("Siskin (Carduelis spinus)","Woodland birds","(1995-2014)",2.26),
        ("European shag (Phalacrocorax artistotelis)","Seabirds","(1986-2014)",-2.31),("Mute Swan (Cygnus olor)","Water and wetland birds","(1975-2014)",1.65)
        ,("Collared Dove (Streptopelia decaocto)","other","(1970-2014)",5.2)]
schema1 = StructType(
    [StructField("Species", StringType()),
     StructField("Category", StringType()),
     StructField("Period", StringType()),
     StructField("Annual_percentage_change", FloatType())
     ])

In [10]:
rdd = sc.parallelize(info)
data = spark.createDataFrame(rdd, schema=schema1)

In [11]:
data2 = data.withColumn("English_Name", get_english_name(col("Species")))\
    .withColumn("start_yearn", get_start_year(col("Period")))\
        .withColumn("Trend", get_trend(col("Annual_percentage_change")))


In [12]:
data2.show()


+--------------------+--------------------+-----------+------------------------+-------------+-----------+---------------+
|             Species|            Category|     Period|Annual_percentage_change| English_Name|start_yearn|          Trend|
+--------------------+--------------------+-----------+------------------------+-------------+-----------+---------------+
|Greenfinch (Chlor...|      Farmland birds|(1970-2014)|                   -1.13|   Greenfinch|       1970|   weak decline|
|Siskin (Carduelis...|      Woodland birds|(1995-2014)|                    2.26|       Siskin|       1995|strong increase|
|European shag (Ph...|            Seabirds|(1986-2014)|                   -2.31|European shag|       1986|   weak decline|
|Mute Swan (Cygnus...|Water and wetland...|(1975-2014)|                    1.65|    Mute Swan|       1975|strong increase|
|Collared Dove (St...|               other|(1970-2014)|                     5.2|Collared Dove|       1970|strong increase|
+---------------

In [None]:
spark.stop()

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType,IntegerType,StructType,StructField,FloatType
from pyspark.sql.functions import when, col, udf
spark = SparkSession.builder.appName("exp").getOrCreate()
sc = spark.sparkContext
@udf(returnType=StringType())
def get_english_name(val):
    return val[0:val.index(" (")]

@udf(returnType=IntegerType())
def get_start_year(val):
    return int(val[1:5])

@udf(returnType=StringType())
def get_trend(x):
    if x < -3.00:
        return "strong decline"
    elif -3.00 < x < -0.50:
        return "weak decline"
    elif -0.50 <x<0.50:
        return "no change"
    else:
        return "strong increase"

info = [("Greenfinch (Chloris chloris)","Farmland birds","(1970-2014)",-1.13),("Siskin (Carduelis spinus)","Woodland birds","(1995-2014)",2.26),
        ("European shag (Phalacrocorax artistotelis)","Seabirds","(1986-2014)",-2.31),("Mute Swan (Cygnus olor)","Water and wetland birds","(1975-2014)",1.65)
        ,("Collared Dove (Streptopelia decaocto)","other","(1970-2014)",5.2)]
schema1 = StructType(
    [StructField("Species", StringType()),
     StructField("Category", StringType()),
     StructField("Period", StringType()),
     StructField("Annual_percentage_change", FloatType())
     ])

rdd = sc.parallelize(info)
data = spark.createDataFrame(rdd, schema=schema1)

data2 = data.withColumn("English_Name", get_english_name(col("Species")))\
    .withColumn("start_yearn", get_start_year(col("Period")))\
        .withColumn("Trend", get_trend(col("Annual_percentage_change")))
data2.show()
spark.stop()