<a href="https://colab.research.google.com/github/alifzl/Zhaav-MINER-Scripts/blob/main/exm01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
from typing import Union
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.pandas import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql import functions
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# create sparksesion in order to use spark sql:

In [2]:
def createSparkSession(appname:str):
        '''
        create spark session rdd.
            Parameters:
            ---------- 
                    appname (str): The appliction name  
                    

            Returns:
                    SparkSession

        '''
        
        import pyspark
        from pyspark.sql import SparkSession
        spark = SparkSession.builder.appName(appname).getOrCreate()
        return spark

In [70]:
spark=createSparkSession("exp01")

## Read CSV file into DataFrame with schema and header


In [3]:
def readcsvfile(header:str,inferSchema:str,appname:str,path:str,sep:str):
        '''
        create dataframe by reading csv file.
            Parameters:
            ---------- 
                    header(str):  auto detect header of the file  ,
                  
                    inferSchema(str):  auto detect type of columns,
                    
                    appname(str):  The appliction name,
                    
                    path(str):    file path
                    
                    sep(str): columns separator

            Returns
        --------------
               dataframe( :class:`DataFrame) : return dataframe out of csv file

        '''
        spark=createSparkSession(appname)
        dataframe=spark.read.csv(path,header=header,inferSchema=inferSchema,sep=sep) 
        return dataframe

In [6]:
df=readcsvfile(header=True,inferSchema=True,appname="exp01",path='/content/diabetes.csv',sep=',')

In [90]:
def show(dataframe,number:int=None,truncate:bool=None):
        '''
        show the dataframe .
        
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe
                    
            number(int) : optional, number of rows to show.
            
            truncate : bool, optional
            If set to ``True``, truncate strings longer than 20 chars by default.
            If set to a number greater than one, truncates long strings to length ``truncate``
            and align cells right.
            
            Returns
        --------------
               """Prints the first ``n`` rows to the console.

    

        

                '''
        dataframe.show(number,truncate)

## common command

### show the dataframe

In [8]:
show(df,5,False)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI |DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|6          |148    |72           |35           |0      |33.6|0.627                   |50 |1      |
|1          |85     |66           |29           |0      |26.6|0.351                   |31 |0      |
|8          |183    |64           |0            |0      |23.3|0.672                   |32 |1      |
|1          |89     |66           |23           |94     |28.1|0.167                   |21 |0      |
|0          |137    |40           |35           |168    |43.1|2.288                   |33 |1      |
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



### type of dataframe column

In [9]:
def dataframetype(dataframe):
    """Returns all column names and their data types as a list
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe
                    
        Returns
        --------------
               Prints types of the rows """
    return dataframe.dtypes

In [10]:
dataframetype(df)

[('Pregnancies', 'int'),
 ('Glucose', 'int'),
 ('BloodPressure', 'int'),
 ('SkinThickness', 'int'),
 ('Insulin', 'int'),
 ('BMI', 'double'),
 ('DiabetesPedigreeFunction', 'double'),
 ('Age', 'int'),
 ('Outcome', 'int')]

### statistic describe dataframe

In [11]:
def statisticdescribe(dataframe):
        """Returns statistics summary of dataframe
        
        Parameters:
        --------------
        dataframe(:class:`DataFrame) : select dataframe

        Returns
        --------------
              :class:`DataFrame"""
        dataframe_new=dataframe.describe()
        return dataframe_new

In [12]:
statisticdescribe(df).show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

### list of the columns

In [13]:
def coloumes(dataframe):
    """Returns all column names  as a list
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe
                    
        Returns
        --------------
               Prints all column names """
    return dataframe.columns

In [14]:
coloumes(df)

['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

### select and show some columns

In [15]:
def select(dataframe,coloumes:Union[list, str]):
    """select specfic columns by name and return new dataframe
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe
            
            coloumes(str/list): list of string coloumes names         

        Returns
        --------------
              dataframe_new(:class:`DataFrame ): return new dataframe
              
              
        Example
        -------------- 
            1- common select 
                
                dataframe_new=dataframe.select("name")


            
            
            2- also can slicing the dataframe with start and end
            
                dataframe_new=dataframe.select(dataframe.columns[start=10:end=20])"""
    dataframe_new=dataframe.select(coloumes)
    return dataframe_new

In [None]:
select(df,["Pregnancies","Glucose","BloodPressure"]).show()

+-----------+-------+-------------+
|Pregnancies|Glucose|BloodPressure|
+-----------+-------+-------------+
|          6|    148|           72|
|          1|     85|           66|
|          8|    183|           64|
|          1|     89|           66|
|          0|    137|           40|
|          5|    116|           74|
|          3|     78|           50|
|         10|    115|            0|
|          2|    197|           70|
|          8|    125|           96|
|          4|    110|           92|
|         10|    168|           74|
|         10|    139|           80|
|          1|    189|           60|
|          5|    166|           72|
|          7|    100|            0|
|          0|    118|           84|
|          7|    107|           74|
|          1|    103|           30|
|          1|    115|           70|
+-----------+-------+-------------+
only showing top 20 rows



### list of all rows

In [16]:
def collect(dataframe)->list:
  """ return all of dataframe as list
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe
            
                   

        Returns
        --------------
              ListData(list): list of all dataframe """

  ListData=dataframe.collect()
  return ListData

In [17]:
collect(df)[0:5]

[Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1),
 Row(Pregnancies=1, Glucose=85, BloodPressure=66, SkinThickness=29, Insulin=0, BMI=26.6, DiabetesPedigreeFunction=0.351, Age=31, Outcome=0),
 Row(Pregnancies=8, Glucose=183, BloodPressure=64, SkinThickness=0, Insulin=0, BMI=23.3, DiabetesPedigreeFunction=0.672, Age=32, Outcome=1),
 Row(Pregnancies=1, Glucose=89, BloodPressure=66, SkinThickness=23, Insulin=94, BMI=28.1, DiabetesPedigreeFunction=0.167, Age=21, Outcome=0),
 Row(Pregnancies=0, Glucose=137, BloodPressure=40, SkinThickness=35, Insulin=168, BMI=43.1, DiabetesPedigreeFunction=2.288, Age=33, Outcome=1)]

## Common filtering

### starts with

In [18]:
def startswith(dataframe,columns:str,value:str):
    """ return all row of dataframe as a new dataframe that starts with the value  
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe

            columns(str) : columns of dataframe
            
                   

        Returns
        --------------
              dataframe_new(:class:`DataFrame): dataframe of the value"""
    dataframe_new=dataframe.where(dataframe[columns].startswith(value))
    return dataframe_new

In [19]:
startswith(df,"Glucose","89").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          1|     89|           76|           34|     37|31.2|                   0.192| 23|      0|
|          3|     89|           74|           16|     85|30.4|                   0.551| 38|      0|
|          2|     89|           90|           30|      0|33.5|                   0.292| 42|      0|
|          1|     89|           24|           19|     25|27.8|                   0.559| 21|      0|
|          9|     89|           62|            0|      0|22.5|                   0.142| 33|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+


### endswith

In [20]:
def endswith(dataframe,columns:str,value:str):
    """ return all row of dataframe as a new dataframe that ends with the value 
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe

            columns(str) : columns of dataframe
            
                   

        Returns
        --------------
              dataframe_new(:class:`DataFrame): dataframe of the value"""
    dataframe_new=dataframe.where(dataframe[columns].endswith(value))
    return dataframe_new

In [21]:
endswith(df,"DiabetesPedigreeFunction","2").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          8|    125|           96|            0|      0| 0.0|                   0.232| 54|      1|
|         10|    122|           78|           31|      0|27.6|                   0.512| 45|      0|
|         11|    138|           76|            0|      0|33.2|                    0.42| 35|      0|
|          5|     88|           66|           21|     23|24.4|                   0.342| 30|      0|
|          0|    100|           88|           60|    110|46.8|                   0.962| 31|      0|
|          2|     74|            0|            0|      0| 0.0|                   0.102| 22|      0|


### filter by condition

In [22]:
def filt(dataframe,condition:str):
    """ return all row of dataframe as dataframe that filter by condition 
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe

            columns(str) : columns of dataframe
            
                   

        Returns:
        --------------
              dataframe_new(:class:`DataFrame): dataframe of the value
              
        Example:
        --------------
        dataframe_new = filt (df,df["Ward"]=="value") 

              """
        
        
    dataframe_new=dataframe.filter(condition)
    return dataframe_new

In [23]:
filt(df,df["SkinThickness"]=="0").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|         10|    115|            0|            0|      0|35.3|                   0.134| 29|      0|
|          8|    125|           96|            0|      0| 0.0|                   0.232| 54|      1|
|          4|    110|           92|            0|      0|37.6|                   0.191| 30|      0|
|         10|    168|           74|            0|      0|38.0|                   0.537| 34|      1|
|         10|    139|           80|            0|      0|27.1|                   1.441| 57|      0|


## sort data

### count of data in column

In [24]:
def groupbydesc(dataframe,columnname:str):
  """ return  a sorted dataframe by  columnname
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           

            columnname(str):select the column
        Returns:
        --------------
            df_sorted (:class:`DataFrame) : new sorted dataframe 
            

            """
  from pyspark.sql.functions import col
  df_sorted = dataframe.groupby(columnname).count().orderBy(col("count").desc())
  return df_sorted

In [25]:
groupbydesc(df,"Glucose").show()

+-------+-----+
|Glucose|count|
+-------+-----+
|    100|   17|
|     99|   17|
|    111|   14|
|    106|   14|
|    129|   14|
|    125|   14|
|    108|   13|
|    112|   13|
|    105|   13|
|    102|   13|
|     95|   13|
|    122|   12|
|    109|   12|
|    128|   11|
|    117|   11|
|    120|   11|
|    107|   11|
|    119|   11|
|    114|   11|
|    124|   11|
+-------+-----+
only showing top 20 rows



### ascending or descending sort data 

In [26]:
def sortascending(dataframe,coloumes:str,ascending:bool):
    """ return all sorted  dataframe with  ascending or descending method
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe

            columns(str) : columns of dataframe
            
            ascending(bool): if ascending equal to true it's sorted by ascending if it's equal to False sorted descending
            
                   

        Returns:
        --------------
            dataframe_asc (:class:`DataFrame) : dataframe of sort value"""

    dataframe_asc = dataframe.orderBy(coloumes,ascending=ascending) 
    return dataframe_asc

In [27]:
sortascending(df,"BloodPressure",False).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          1|     96|          122|            0|      0|22.4|                   0.207| 27|      0|
|         13|    158|          114|            0|      0|42.3|                   0.257| 44|      1|
|          9|    171|          110|           24|    240|45.4|                   0.721| 54|      1|
|          0|    129|          110|           46|    130|67.1|                   0.319| 26|      1|
|          4|    189|          110|           31|      0|28.5|                    0.68| 37|      0|
|          5|    103|          108|           37|      0|39.2|                   0.305| 65|      0|
|          5|    137|          108|            0|      0|48.8|                   0.227| 37|      1|


### filter columns using is in

In [28]:
def isin (dataframe,columns:str,value:str):
   """ return  a  dataframe  where the columns contain Specific  value
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe

            columns(str) : columns of dataframe
            
            value(str): value in columns
                   

        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : datafram contain Specific  value"""
   dataframe_new = dataframe[dataframe[columns].isin(value)]
   return dataframe_new

In [119]:
isin(df,"BloodPressure","72").show(20)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|
|          5|    166|           72|           19|    175|25.8|                   0.587| 51|      1|
|          4|    111|           72|           47|    207|37.1|                    1.39| 56|      1|
|          8|    133|           72|         null|   null|32.9|                    0.27| 39|      1|
|          5|     95|           72|           33|   null|37.7|                    0.37| 27|      0|
|         13|    106|           72|           54|   null|36.6|                   0.178| 45|      0|
|          4|    134|           72|         null|   null|23.8|                   0.277| 60|      1|


## join tow dataframe

In [30]:
def join(dataframe1,dataframe2,list_coloumes:str,how:str):
    """ return  a  dataframe  combinong two dataframes
            
        Parameters:
        -------------- 
                   
            dataframe1(:class:`DataFrame) : select dataframe  number 1

            dataframe2(:class:`DataFrame) : select dataframe  number 2

            list_coloumes(str) : columns of dataframe
            
            how(str):   inner', 'outer', 'full', 'fullouter', 'full_outer', 
                        'leftouter', 'left', 'left_outer', 'rightouter', 'right', 
                        'right_outer', 'leftsemi', 'left_semi', 'semi', 'leftanti', 
                        'left_anti', 'anti', 'cross' joins
                   

        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : combinong dataframe """
    dataframe_join=dataframe1.join(dataframe2,list_coloumes,how)
    return  dataframe_join

In [31]:
df1=select(df,["Pregnancies","Glucose"])
df2=select(df,["Glucose","Insulin"])

In [32]:
join(df1,df2,["Glucose"],how="full").show()

+-------+-----------+-------+
|Glucose|Pregnancies|Insulin|
+-------+-----------+-------+
|      0|          1|      0|
|      0|          1|     23|
|      0|          1|      0|
|      0|          1|      0|
|      0|          1|      0|
|      0|          1|      0|
|      0|          1|     23|
|      0|          1|      0|
|      0|          1|      0|
|      0|          1|      0|
|      0|          1|      0|
|      0|          1|     23|
|      0|          1|      0|
|      0|          1|      0|
|      0|          1|      0|
|      0|          5|      0|
|      0|          5|     23|
|      0|          5|      0|
|      0|          5|      0|
|      0|          5|      0|
+-------+-----------+-------+
only showing top 20 rows



## columns operation

### make new column

In [33]:
def makecolumn(dataframe,name:str,opration):
    """ return  a new  dataframe  from old dataframe by using opration
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            name(str):  name of new coloumes
            opration : opertion on some coloumes  

        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe 
            
        Example:
        --------------
        Create a new column named c1 from twice the value of the ward column . 
            dataframe_new=dataframe.withColumn("c1",2*dataframe['ward'])    
            """
    dataframe_new=dataframe.withColumn(name,opration)
    
    return dataframe_new

In [35]:
makecolumn(df,"opt",2*df['Pregnancies']).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|opt|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1| 12|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|  2|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1| 16|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|  2|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|  0|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0| 10|
|          3|     78|           50|           32|     88|31.0|  

### change the type of one coloume


In [36]:
def changetype(dataframe,name_coloume:str,datatype:str):
    """ return  a new  dataframe  from old dataframe with changing data type
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            name_coloume(str):  name of new coloumes
            datatype : int,float,str 

        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe 
            

            """
    from pyspark.sql.functions import col
    dataframe_new=dataframe.withColumn(name_coloume,col = col(name_coloume).cast(datatype))
    return dataframe_new

In [37]:
changetype(df,"SkinThickness","string").dtypes

[('Pregnancies', 'int'),
 ('Glucose', 'int'),
 ('BloodPressure', 'int'),
 ('SkinThickness', 'string'),
 ('Insulin', 'int'),
 ('BMI', 'double'),
 ('DiabetesPedigreeFunction', 'double'),
 ('Age', 'int'),
 ('Outcome', 'int')]

### concat two columns

In [38]:
def concatcolumn(dataframe,column1:str,column2:str,aliasname:str,sep:str):
    """ return  a  dataframe  combinong two dataframes
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            column1(str):  column name 1
            column2(str):  column name 2 
            aliasname(str) : name of new column 
            sep(str) : seperator 

        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe 
            

            """
    from pyspark.sql.functions import concat_ws
    dataframe_new =dataframe.select(concat_ws(sep,dataframe[column1],dataframe[column2]).alias(aliasname))
    return dataframe_new

In [39]:
concatcolumn(df,"Pregnancies","Insulin","concat","/").show()

+------+
|concat|
+------+
|   6/0|
|   1/0|
|   8/0|
|  1/94|
| 0/168|
|   5/0|
|  3/88|
|  10/0|
| 2/543|
|   8/0|
|   4/0|
|  10/0|
|  10/0|
| 1/846|
| 5/175|
|   7/0|
| 0/230|
|   7/0|
|  1/83|
|  1/96|
+------+
only showing top 20 rows



### new column by splite

In [47]:
def spliting(dataframe,column:str,sep:str,aliasname:str):
  """ create  a new dataframe from spliting the old dataframe
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            column(str):  column name 
            
            aliasname(str) : name of new column 
            sep(str) : seperator 

        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe 
            

            """
  from pyspark.sql.functions import split
  dataframe_new=dataframe.withColumn(aliasname,split(dataframe[column],sep))
  return dataframe_new

In [48]:
spliting(df,"Glucose","1","splitcol").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|splitcol|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|  [, 48]|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|    [85]|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|  [, 83]|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|    [89]|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|  [, 37]|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0| [, , 6]|
|          3|     7

### Construct a new dynamic column From two columns 


In [60]:
def newcolumn(dataframe,newcolumnname:str,column1:str,column2:str):
    """ Construct a new dynamic column From two columns 
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            newcolumnname(str):  column name 
            
            column1(str) : name of  column 1
            column2(str) : name of  column 2 
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe 
            

            """
    from pyspark.sql import functions as F
    dataframe_new = dataframe.withColumn(newcolumnname, F.when((dataframe[column1].isNotNull() & dataframe[column2].isNotNull())
                                     , F.concat(dataframe[column1], dataframe[column2])).otherwise(F.lit(None)))
    return dataframe_new

In [71]:
show(newcolumn(df,"Table","BloodPressure","SkinThickness"),5,False)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI |DiabetesPedigreeFunction|Age|Outcome|Table|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----+
|6          |148    |72           |35           |null   |33.6|0.627                   |50 |1      |7235 |
|1          |85     |66           |29           |null   |26.6|0.351                   |31 |0      |6629 |
|8          |183    |64           |null         |null   |23.3|0.672                   |32 |1      |null |
|1          |89     |66           |23           |94     |28.1|0.167                   |21 |0      |6623 |
|0          |137    |40           |35           |168    |43.1|2.288                   |33 |1      |4035 |
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----+
only showing top 5 rows



### Remove columns

In [111]:
def removecolumns(dataframe,columnsname:str):
  """ remove specific  column in dataframe 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
                       
            column(str):  column name 
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with remove a column
            
  """

  dataframe_new = dataframe.drop(columnsname)
  return dataframe_new

In [112]:
removecolumns(df,"Glucose").show()

+-----------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|           72|           35|   null|33.6|                   0.627| 50|      1|
|          1|           66|           29|   null|26.6|                   0.351| 31|      0|
|          8|           64|         null|   null|23.3|                   0.672| 32|      1|
|          1|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|           74|         null|   null|25.6|                   0.201| 30|      0|
|          3|           50|           32|     88|31.0|                   0.248| 26|      1|
|         10|         null|         null|   null|35.3|                   0.134| 

### remove other columns of  dataframe
if specefic column doesn't have the character

In [120]:
def removecontain(dataframe,column:str,character:str):
      
    """ remove other columns of  dataframe if specefic column doesn't have the character
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
                       
            column(str):  column name 

            character(str): specific character name
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with remove  columns
            
    """


    dataframe = dataframe.filter(dataframe[column].contains(character))
    return dataframe

In [122]:
removecontain(df,"BloodPressure","72").show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|
|          5|    166|           72|           19|    175|25.8|                   0.587| 51|      1|
|          4|    111|           72|           47|    207|37.1|                    1.39| 56|      1|
|          8|    133|           72|         null|   null|32.9|                    0.27| 39|      1|
|          5|     95|           72|           33|   null|37.7|                    0.37| 27|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



if the column doesn't start with a specific character

In [123]:
def removestart(dataframe,column,character:str):
          
    """ remove other columns of dataframe  if the column doesn't start with a specific character
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
                       
            column(str):  column name 

            character(str): specific character name
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with remove  columns
            
    """

    dataframe = dataframe.filter(dataframe[column].startswith(character))
    return dataframe
    

In [124]:
removestart(df,"BloodPressure","7").show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|      0|
|          2|    197|           70|           45|    543|30.5|                   0.158| 53|      1|
|         10|    168|           74|         null|   null|38.0|                   0.537| 34|      1|
|          5|    166|           72|           19|    175|25.8|                   0.587| 51|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



In [130]:
def removeend(dataframe,column,character:str):
              
    """ remove whole dataframe if it doesn't end with a specific character
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
                       
            column(str):  column name 

            character(str): specific character name
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with remove  columns
            
    """
    dataframe = dataframe.filter(dataframe[column].endswith(character))
    return dataframe

In [131]:
removeend(df,"BloodPressure","2").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|
|          4|    110|           92|         null|   null|37.6|                   0.191| 30|      0|
|          5|    166|           72|           19|    175|25.8|                   0.587| 51|      1|
|         13|    145|           82|           19|    110|22.2|                   0.245| 57|      0|
|          5|    117|           92|         null|   null|34.1|                   0.337| 38|      0|
|          6|     92|           92|         null|   null|19.9|                   0.188| 28|      0|
|          4|    111|           72|           47|    207|37.1|                    1.39| 56|      1|


### writte sql Query for dataframe

In [62]:
def createQuery(dataframe,name:str,Query:str):
   """ Construct a new datafram(table) with sql Query
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            name(str):   table name
            Query(str): sql Query

        Returns:
        --------------
            sparksql (:class:`DataFrame) : new dataframe



          Examples
        --------
        >>> df.createOrReplaceTempView("people")
        >>> df2 = df.filter(df.age > 3)
        >>> df2.createOrReplaceTempView("people")
        >>> df3 = spark.sql("select * from people")
        >>> sorted(df3.collect()) == sorted(df2.collect())
        True
        >>> spark.catalog.dropTempView("people")
        """
   dataframe.createOrReplaceTempView(name)
   sparksql=spark.sql(Query)
   return sparksql

In [74]:
show(createQuery(df,"Table","select * from Table where Outcome=1"),5,False)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI |DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|6          |148    |72           |35           |null   |33.6|0.627                   |50 |1      |
|8          |183    |64           |null         |null   |23.3|0.672                   |32 |1      |
|0          |137    |40           |35           |168    |43.1|2.288                   |33 |1      |
|3          |78     |50           |32           |88     |31.0|0.248                   |26 |1      |
|2          |197    |70           |45           |543    |30.5|0.158                   |53 |1      |
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



## handeling null value
according to the information of this dataset all zero value in columns are considered null value except Pregnancies column

In [50]:
def replacewithnull(dataframe,specificvalue:Union[int,str],columnsname:str):
    
  """ replace specific value  column in dataframe with null value
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            specificvalue(int,str) : specific value 
            
            column(list,otional):  column name 
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with specific value
            
  """
  dataframe_new = dataframe.replace({specificvalue: None}, subset=[columnsname])
  return dataframe_new

In [53]:
for item in df.columns[1:-1]:
  df=replacewithnull(df,0,item)


In [55]:
show(df,5,True)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



### list of null value count

In [56]:
def null_value_calc(dataframe)->list:
  """ return  a  list of null value
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           

        Returns:
        --------------
            (null_columns_counts) (list) : list of null value
            

            """
  from pyspark.sql.functions import col
  null_columns_counts=[]
  numrows=dataframe.count()
  for k in dataframe.columns:
    nullrows=dataframe.where(col(k).isNull()).count()
    if (nullrows>0):
      temp = k,nullrows,(nullrows/numrows)*100
      null_columns_counts.append(temp)
  return (null_columns_counts)

In [57]:
null_value_calc(df)

[('Glucose', 5, 0.6510416666666667),
 ('BloodPressure', 35, 4.557291666666666),
 ('SkinThickness', 227, 29.557291666666668),
 ('Insulin', 374, 48.69791666666667),
 ('BMI', 11, 1.4322916666666665)]

### Replace all nulls with a specific value


In [75]:
def replacenull(dataframe,columns:str,newvalue:str):
    """ replace all missing data with optional value 
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            newvalue(str) : optional value

            columns(str) : name of  column
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with missing replace 
            

        """
    dataframe = dataframe.fillna({columns: newvalue})
    return dataframe

In [77]:
replacenull(df,"Insulin","5").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      5|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      5|26.6|                   0.351| 31|      0|
|          8|    183|           64|         null|      5|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|         null|      5|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


### find basic statistic parameter to replace with null values

In [80]:
def findstatistict(dataframe,column:str)->dict:
    """ return basic statistic parameter 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
                       
            column(str):  column name 
            
        Returns:
        --------------
            avg(int): average of the column

            stddev(int): standard deviation of the column

            min(int): minimum of the column

            max(int): maximum of the column

            q1(int): quartile first of the column

            q2(int): quartile second of the column

            q3(int): quartile third of the column
            
    """





    avg=eval(dataframe.summary().select("summary",column).collect()[1][1])
    stddev=eval(dataframe.summary().select("summary",column).collect()[2][1])
    min=eval(dataframe.summary().select("summary",column).collect()[3][1])
    max=eval(dataframe.summary().select("summary",column).collect()[7][1])
    q1=eval(dataframe.summary().select("summary",column).collect()[4][1])
    q2=eval(dataframe.summary().select("summary",column).collect()[5][1])
    q3=eval(dataframe.summary().select("summary",column).collect()[6][1])

    base={"avg":avg,"stddev":stddev,"min":min,"max":max,"quartile1":q1,"quartile2":q2,"quartile3":q3}
    return base


In [85]:
for item in df.columns[1:-1]:
  print(item,": ",findstatistict(df,item))

Glucose :  {'avg': 121.6867627785059, 'stddev': 30.53564107280403, 'min': 44, 'max': 199, 'quartile1': 99, 'quartile2': 117, 'quartile3': 141}
BloodPressure :  {'avg': 72.40518417462484, 'stddev': 12.382158210105265, 'min': 24, 'max': 122, 'quartile1': 64, 'quartile2': 72, 'quartile3': 80}
SkinThickness :  {'avg': 29.153419593345657, 'stddev': 10.476982369987208, 'min': 7, 'max': 99, 'quartile1': 22, 'quartile2': 29, 'quartile3': 36}
Insulin :  {'avg': 155.5482233502538, 'stddev': 118.77585518724517, 'min': 14, 'max': 846, 'quartile1': 76, 'quartile2': 125, 'quartile3': 190}
BMI :  {'avg': 32.45746367239099, 'stddev': 6.924988332105911, 'min': 18.2, 'max': 67.1, 'quartile1': 27.5, 'quartile2': 32.3, 'quartile3': 36.6}
DiabetesPedigreeFunction :  {'avg': 0.4718763020833327, 'stddev': 0.331328595012775, 'min': 0.078, 'max': 2.42, 'quartile1': 0.243, 'quartile2': 0.371, 'quartile3': 0.626}
Age :  {'avg': 33.240885416666664, 'stddev': 11.760231540678689, 'min': 21, 'max': 81, 'quartile1': 

### fill all nulls with a specific value

In [78]:
def fillnull(dataframe,newvalue:Union[str,int],subset:list):
  """ replace all missing data with optional value for entire dataframe
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            newvalue(str,int) : optional value

            columns(str) : name of  column
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with missing replace 
            

        """
  dataframe_new=dataframe.na.fill(newvalue,subset)
  return dataframe_new

In [86]:
fillnull(df,72.40518417462484,df.columns[1:-1]).show()

+-----------+-------+-------------+-------------+-------+-----------------+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|              BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+-----------------+------------------------+---+-------+
|          6|    148|           72|           35|     72|             33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|     72|             26.6|                   0.351| 31|      0|
|          8|    183|           64|           72|     72|             23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|             28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|             43.1|                   2.288| 33|      1|
|          5|    116|           74|           72|     72|             25.6|                   0.

###drop null values
How and Thresh and Subset is optional

In [94]:
def dropnnull(dataframe,thresh:int=None,how:str="any",subset:list=None):
  """ drop all missing data column in dataframe
            
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            thresh(int,optional) : optional value

            how(str,optional) : include (any, all)
            
            subset(list,optional): list of the column
            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with drop column 
            
  """
  dataframe_new=dataframe.na.drop(how,thresh=thresh,subset=subset)
  return dataframe_new

In [95]:
dropnnull(df).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|
|          2|    197|           70|           45|    543|30.5|                   0.158| 53|      1|
|          1|    189|           60|           23|    846|30.1|                   0.398| 59|      1|
|          5|    166|           72|           19|    175|25.8|                   0.587| 51|      1|
|          0|    118|           84|           47|    230|45.8|                   0.551| 31|      1|


### impute null data

#### avarage strategy

In [220]:
def imputnullavg(dataframe,inputCols:list,outputCols:list):   
    """ return a dataframe  with imputing null value with average 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            inputCols(list) : name of input columns 

            outputCols(list) : name of output columns 

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe   with imputing null value with average 
            
            """
    from pyspark.ml.feature import Imputer
    a= Imputer(inputCols=inputCols,outputCols=outputCols)
    a.setStrategy("mean")
    dataframe = a.fit(dataframe).transform(dataframe)
    return dataframe

In [232]:
df_avg=imputnullavg(df,df.columns[1:-1],df.columns[1:-1])

In [233]:
df_avg.show()

+-----------+-------+-------------+-------------+-------+-----------------+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|              BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+-----------------+------------------------+---+-------+
|          6|    148|           72|           35|    155|             33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|    155|             26.6|                   0.351| 31|      0|
|          8|    183|           64|           29|    155|             23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|             28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|             43.1|                   2.288| 33|      1|
|          5|    116|           74|           29|    155|             25.6|                   0.

#### median strategy


In [239]:
def imputnullmedian(dataframe,inputCols:list,outputCols:list):   
    """ return a dataframe  with imputing null value with median 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            inputCols(str) : name of input columns 

            outputCols(str) : name of output columns 

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe   with imputing null value with median 
            
            """
    from pyspark.ml.feature import Imputer
    a= Imputer(inputCols=inputCols,outputCols=outputCols)
    a.setStrategy("median")
    dataframe=a.fit(dataframe).transform(dataframe)
    return dataframe

## we select median Strategy

In [242]:
df=imputnullmedian(df,df.columns[1:-1],df.columns[1:-1])

In [243]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|    125|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|    125|26.6|                   0.351| 31|      0|
|          8|    183|           64|           29|    125|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|           29|    125|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


## Number Operations


### Round the number

In [132]:
def roundnumber(dataframe,newcolumn:str,column:str,scale:int):
    """ return round of integer columns
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            newcolumn(str): new column name 
            
            column(str):   column name

            scale(int): a scale of rounding


            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with  with a round of a columns
            """
    from pyspark.sql import functions as F
    dataframe_new = dataframe.withColumn(newcolumn, F.round(dataframe[column], scale))
    return dataframe_new

In [137]:
roundnumber(df,"round","DiabetesPedigreeFunction",1).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|round|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|  0.6|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|  0.4|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|  0.7|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|  0.2|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|  2.3|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|      0|  0.2|
|          3|     78|           50|           

### Floor of the number

In [141]:
def floarnumber(dataframe,newcolumn:str,column:str):
    """ return floor of integer columns
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            newcolumn(str): new column name 
            
            column(str):   column name

            scale(int): a scale of rounding


            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with  with a round of a columns
            """
    from pyspark.sql import functions as F
    dataframe_new = dataframe.withColumn(newcolumn, F.floor(dataframe[column]))
    return dataframe_new

In [142]:
floarnumber(df,"floor","DiabetesPedigreeFunction").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|floor|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-----+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|    0|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|    0|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|    0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|    0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    2|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|      0|    0|
|          3|     78|           50|           

### Ceiling 

In [143]:
def Ceilingnumber(dataframe,newcolumn:str,column:str):
    """ return Ceilin of integer columns
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            newcolumn(str): new column name 
            
            column(str):   column name

          


            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with Ceilin a columns
            """
    from pyspark.sql import functions as F
    dataframe_new = dataframe.withColumn(newcolumn, F.ceil(dataframe[column]))
    return dataframe_new

In [144]:
Ceilingnumber(df,"ceil","DiabetesPedigreeFunction").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+----+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|ceil|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+----+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|   1|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|   1|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|   1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|   1|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|   3|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|      0|   1|
|          3|     78|           50|           32|     8

### raised to power

In [148]:
def powernumber(dataframe,newcolumn:str,column1:str,column2:str):
    """ return a column calculate first column power by second column 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            newcolumn(str): new column name 
            
            column1(str): first  column name

            column2(str): second  column name

          


            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with a column calculate first column power by second column
            """
    from pyspark.sql import functions as F
    dataframe = dataframe.withColumn(newcolumn, F.pow(column1,column2))
    return dataframe


In [149]:
powernumber(df,"power","BMI","DiabetesPedigreeFunction").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|             power|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+------------------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1| 9.057633148063402|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|3.1632495465615262|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1| 8.295893707159925|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0| 1.745556680247769|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1| 5491.411566466998|
|          5|    116|           74|         null

### Select smallest value out of multiple columns

In [169]:
def smallestcolumns( dataframe,newcolumn:str,cols:tuple):
    """ return a  smallest value out of multiple columns 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            newcolumn(str): new column name 
            
            cols(tuple):   name of the columns

          


            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with a  smallest value out of multiple columns 
            """
    a=str(cols)
    from pyspark.sql import functions as F
    dataframe_new = dataframe.withColumn(newcolumn, eval("F.least"+a))
    return dataframe_new

In [170]:
smallestcolumns(df,"newcolumn",("BMI","DiabetesPedigreeFunction")).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|newcolumn|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|    0.627|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|    0.351|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|    0.672|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|    0.167|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    2.288|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|      0|    0.201|
|         

### Select largest value out of multiple columns

In [171]:
def greatestcolumns( dataframe,newcolumn:str,cols:str):
    """ return a  greatest value out of multiple columns 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe           
            
            newcolumn(str): new column name 
            
            cols(tuple):   name of the columns

          


            
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with a  greatest value out of multiple columns
            """
    from pyspark.sql import functions as F
    dataframe = dataframe.withColumn(newcolumn, eval("F.greatest"+str(cols)))
    return dataframe
                                     

In [172]:
greatestcolumns(df,"newcolumn",("BloodPressure","Insulin")).show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|newcolumn|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|       72|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|       66|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|       64|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|       94|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|      168|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|      0|       74|
|         

### Array Operations

### create array out of columns

In [193]:
def creatarray(dataframe,newcolumn:str,cols:tuple):
    """ create array from selected columns
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            newcolumn(str): new column name 
            
            cols(tuple(str)) : name of columns

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with array column from selected columns
            """
    
    from pyspark.sql import functions as F
    dataframe_new = dataframe.withColumn(newcolumn, eval("F.array"+str(cols)))
    return dataframe_new


In [194]:
df_array=creatarray(df,"newcolumn",("SkinThickness","BloodPressure"))

In [195]:
df_array.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|   newcolumn|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+------------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|    [35, 72]|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|    [29, 66]|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|  [null, 64]|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|    [23, 66]|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    [35, 40]|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|     

### Empty Array


In [190]:
def creatarray(dataframe,newcolumn):
    """ create  empty array 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            newcolumn(str): new column name 
            

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with a empty array 
            """
    
    from pyspark.sql import functions as F
    dataframe_new = dataframe.withColumn(newcolumn, F.array([]))
    return dataframe_new

In [191]:
creatarray(df,"newcolumn").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|newcolumn|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|       []|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|       []|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|       []|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|       []|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|       []|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|      0|       []|
|         

In [197]:
def sizearray(dataframe,newcolumn:str,columnarray:str):
    """ return size of array
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            newcolumn(str): new column name 

            columnarray(str) : name of column array
            

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe  
            """
    from pyspark.sql import functions as F
    dataframe_new = dataframe.withColumn(newcolumn, F.size(F.col(columnarray)))
    return dataframe_new

In [198]:
sizearray(df_array,"size","newcolumn").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+------------+----+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|   newcolumn|size|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+------------+----+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|    [35, 72]|   2|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|    [29, 66]|   2|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|  [null, 64]|   2|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|    [23, 66]|   2|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|    [35, 40]|   2|
|          5|    116|           74|         null|   null

## Aggregation Operations

#### Row Count       

In [199]:
def valuecount(dataframe,columname:str)->int:
    """ return count of selected column
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            


            columname(str) : name of column 
            

            
            
                       
        Returns:
        --------------
            count (int) :  count of selected column 
            """
    count=dataframe.select(columname).count()
    return count

In [200]:
valuecount(df,"BloodPressure")

768

### mean of column

In [207]:
def meancolumn(dataframe,groupbycolumns:tuple,meancolumns:str):
    """ return mean of selected column
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            groupbycolumns(tuple) : name of columns 

            meancolumns(str):mean column
            

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with mean of selected column
            
            """

    dataframe_new=dataframe.groupBy(groupbycolumns).mean(meancolumns)
    return dataframe_new

In [206]:
meancolumn(df,(df.columns[1:-1]),"BloodPressure").show()

+-------+-------------+-------------+-------+----+------------------------+---+------------------+
|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|avg(BloodPressure)|
+-------+-------------+-------------+-------+----+------------------------+---+------------------+
|    143|           94|           33|    146|36.6|                   0.254| 51|              94.0|
|     90|           80|           14|     55|24.4|                   0.249| 24|              80.0|
|     84|           68|           30|    106|31.9|                   0.591| 25|              68.0|
|     94|           70|           27|    115|43.5|                   0.347| 21|              70.0|
|    130|           78|           23|     79|28.4|                   0.323| 34|              78.0|
|    197|           70|           99|   null|34.7|                   0.575| 62|              70.0|
|    173|           74|         null|   null|36.8|                   0.088| 38|              74.0|
|     75| 

### max of columns

In [201]:
def maxncloumn(dataframe,groupbycolumns:tuple,maxcolumns:str):
    """ return max of selected column
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            groupbycolumns(tuple) : name of columns 

            maxcolumns(str):mean column
            

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with max of selected column
            
            """
    dataframe_new=dataframe.groupBy(groupbycolumns).max(maxcolumns)
    return dataframe_new

In [210]:
maxncloumn(df,(df.columns[1:-3]),"BloodPressure").show()

+-------+-------------+-------------+-------+----+------------------+
|Glucose|BloodPressure|SkinThickness|Insulin| BMI|max(BloodPressure)|
+-------+-------------+-------------+-------+----+------------------+
|    112|           66|         null|   null|37.8|                66|
|    146|         null|         null|   null|27.5|              null|
|    114|           64|         null|   null|27.4|                64|
|     87|           68|           34|     77|37.6|                68|
|    136|           84|           35|    130|28.3|                84|
|     87|           80|         null|   null|23.2|                80|
|    122|           86|         null|   null|34.7|                86|
|     94|           65|           22|   null|24.7|                65|
|    116|           74|         null|   null|25.6|                74|
|    101|           50|           15|     36|24.2|                50|
|    100|           68|           25|     71|38.5|                68|
|    136|           

### min columns

In [212]:
def mincloumn(dataframe,groupbycolumns:tuple,mincloumns:str):
    """ return min of selected column
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            groupbycolumns(tuple) : name of columns 

            mincloumns(str):mean column
            

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe with min of selected column
            
            """
    dataframe_new=dataframe.groupBy(groupbycolumns).min(mincloumns)
    return dataframe_new

In [213]:
mincloumn(df,(df.columns[1:-3]),"BloodPressure").show()

+-------+-------------+-------------+-------+----+------------------+
|Glucose|BloodPressure|SkinThickness|Insulin| BMI|min(BloodPressure)|
+-------+-------------+-------------+-------+----+------------------+
|    112|           66|         null|   null|37.8|                66|
|    146|         null|         null|   null|27.5|              null|
|    114|           64|         null|   null|27.4|                64|
|     87|           68|           34|     77|37.6|                68|
|    136|           84|           35|    130|28.3|                84|
|     87|           80|         null|   null|23.2|                80|
|    122|           86|         null|   null|34.7|                86|
|     94|           65|           22|   null|24.7|                65|
|    116|           74|         null|   null|25.6|                74|
|    101|           50|           15|     36|24.2|                50|
|    100|           68|           25|     71|38.5|                68|
|    136|           

### Advanced Operations


In [214]:
def repartition(dataframe,column:str):
    """ return dataframe  with a new repartition
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : select dataframe        
            
            column(str) : name of columns 

          

            
            
                       
        Returns:
        --------------
            dataframe_new (:class:`DataFrame) : new dataframe  with a new repartition
            
            """

    dataframe_new = dataframe.repartition(dataframe[column])
    return dataframe_new


In [216]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|   null|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|   null|26.6|                   0.351| 31|      0|
|          8|    183|           64|         null|   null|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|         null|   null|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


### Repartition
PySpark Repartition provides a full shuffling of data

In [218]:
repartition(df,"Insulin").show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          0|    137|           68|           14|    148|24.8|                   0.143| 21|      0|
|          0|    106|           70|           37|    148|39.4|                   0.605| 22|      0|
|          2|    155|           52|           27|    540|38.7|                    0.24| 25|      1|
|          7|    187|           50|           33|    392|33.9|                   0.826| 34|      1|
|          3|    113|           50|           10|     85|29.5|                   0.626| 25|      0|
|          3|     89|           74|           16|     85|30.4|                   0.551| 38|      0|
|          1|     86|           66|           52|     65|41.3|                   0.917| 29|      0|


## machine learning

In [322]:
# Data Prep function
    
def MLClassifierDFPrep(df,input_columns,dependent_var,treat_outliers=True,treat_neg_values=True):

    renamed = df.withColumn("label_str", df[dependent_var].cast(StringType())) 
    indexer = StringIndexer(inputCol="label_str", outputCol="label") 
    indexed = indexer.fit(renamed).transform(renamed)

    numeric_inputs = []
    string_inputs = []
    for column in input_columns:
        if str(indexed.schema[column].dataType) == 'StringType':
            indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
            indexed = indexer.fit(indexed).transform(indexed)
            new_col_name = column+"_num"
            string_inputs.append(new_col_name)
        else:
            numeric_inputs.append(column)
            
    if treat_outliers == True:
        print("We are correcting for non normality now!")
        d = {}
        for col in numeric_inputs: 
            d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) 
        for col in numeric_inputs:
            skew = indexed.agg(skewness(indexed[col])).collect() 
            skew = skew[0][0]
            if skew > 1:
                indexed = indexed.withColumn(col, \
                log(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] > d[col][1], d[col][1])\
                .otherwise(indexed[col] ) +1).alias(col))
                print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
            elif skew < -1:
                indexed = indexed.withColumn(col, \
                exp(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] > d[col][1], d[col][1])\
                .otherwise(indexed[col] )).alias(col))
                print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

            
    minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) 
    min_array = minimums.select(array(numeric_inputs).alias("mins")) 
    df_minimum = min_array.select(array_min(min_array.mins)).collect() 
    df_minimum = df_minimum[0][0] 

    features_list = numeric_inputs + string_inputs
    assembler = VectorAssembler(inputCols=features_list,outputCol='features')
    output = assembler.transform(indexed).select('features','label')

    if df_minimum < 0:
        print(" ")
        print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
        print(" ")
    
    if treat_neg_values == True:
        print("You have opted to correct that by rescaling all your features to a range of 0 to 1")
        print(" ")
        print("We are rescaling you dataframe....")
        scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

        scalerModel = scaler.fit(output)

        scaled_data = scalerModel.transform(output)
        final_data = scaled_data.select('label','scaledFeatures')
        final_data = final_data.withColumnRenamed('scaledFeatures','features')
        print("Done!")

    else:
        print("You have opted not to correct that therefore you will not be able to use to Naive Bayes classifier")
        print("We will return the dataframe unscaled.")
        final_data = output
    
    return final_data

### classification 



#### MultilayerPerceptronClassifier

In [350]:
def MultilayerPerceptronClassifiermodel(features,classes,folds,train,test,input_columns):
    """ return a Multilayer Perceptron Classifier accuracy 
             
        Parameters:
        -------------- 
                   
            features(:class:`DataFrame) : features column       
            
            classes(int) : number of column target 

            folds(str) : estimate the skill of the model on new data 

            train(:class:`DataFrame) : train dataframe 
            
            test(:class:`DataFrame) :   test dataframe 
            
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return a Multilayer Perceptron Classifier  accuracy
              
            
            """  




    from pyspark.ml.classification import MultilayerPerceptronClassifier
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    classifier=MultilayerPerceptronClassifier()
    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
              
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
                      
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
  
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

#### OneVsRest

In [330]:
def OneVsRestmodel(features,classes,folds,train,test,input_columns):
    """ return a One Vs Rest accuracy 
             
        Parameters:
        -------------- 
                   
            features(:class:`DataFrame) : features column       
            
            classes(int) : number of column target 

            folds(str) : estimate the skill of the model on new data 

            train(:class:`DataFrame) : train dataframe 
            
            test(:class:`DataFrame) :   test dataframe 
            
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return One Vs Rest  accuracy
              
            
            """  
    from pyspark.ml.classification import OneVsRest,LogisticRegression
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    classifier=OneVsRest()
    Mtype="OneVsRest"

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel

    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Extract list of binary models
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)


    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

#### LogisticRegression

In [331]:
def LogisticRegressionmodel(features,classes,folds,train,test,input_columns) :
    """ return Logistic Regression accuracy 
             
        Parameters:
        -------------- 
                   
            features(:class:`DataFrame) : features column       
            
            classes(int) : number of column target 

            folds(str) : estimate the skill of the model on new data 

            train(:class:`DataFrame) : train dataframe 
            
            test(:class:`DataFrame) :   test dataframe 
            
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return Logistic Regression  accuracy
              
            
            """  
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    classifier=LogisticRegression()
    def FindMtype(classifier):
    
        M = classifier
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,folds,train):
        


       
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                

            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=folds) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,folds,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
       
        # Print the coefficients
        if Mtype in("LogisticRegression"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            print("Intercept: " + str(BestModel.interceptVector))
            print('\033[1m' + " Top 20 Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
            # Convert from numpy array to list
            coeff_array = BestModel.coefficientMatrix.toArray()
            coeff_scores = []
            for x in coeff_array[0]:
                coeff_scores.append(float(x))
            # Then zip with input_columns list and create a df
            result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
            print(result.orderBy(result["coeff"].desc()).show(truncate=False))
            # Save the coefficient values and the models
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

#### Naive Bayes model

In [332]:
def NaiveBayesmodel(features,classes,folds,train,test,input_columns) :
    """ return Naive Bayes accuracy 
             
        Parameters:
        -------------- 
                   
            features(:class:`DataFrame) : features column       
            
            classes(int) : number of column target 

            folds(str) : estimate the skill of the model on new data 

            train(:class:`DataFrame) : train dataframe 
            
            test(:class:`DataFrame) :   test dataframe 
            
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return Naive Bayes  accuracy
              
            """    
    from pyspark.ml.classification import NaiveBayes
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    classifier=NaiveBayes()
    
    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,folds,train):
        
        
      
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            
            
          
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=folds) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,folds,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        
       



        # Print the coefficients
       
        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            print("Intercept: " + str(BestModel.intercept))
            print('\033[1m' + "Top 20 Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
#             print("Coefficients: \n" + str(BestModel.coefficients))
            coeff_array = BestModel.coefficients.toArray()
            coeff_scores = []
            for x in coeff_array:
                coeff_scores.append(float(x))
            # Then zip with input_columns list and create a df
            result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
            print(result.orderBy(result["coeff"].desc()).show(truncate=False))
            # Save the coefficient values and the models
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

#### Random Forest Classifier model

In [333]:
def RandomForestClassifiermodel(features,classes,folds,train,test,input_columns) :
    """ return Random Forest Classifier accuracy 
             
        Parameters:
        -------------- 
                   
            features(:class:`DataFrame) : features column       
            
            classes(int) : number of column target 

            folds(str) : estimate the skill of the model on new data 

            train(:class:`DataFrame) : train dataframe 
            
            test(:class:`DataFrame) :   test dataframe 
            
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return Random Forest Classifier  accuracy
              
            """
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    classifier=RandomForestClassifier()
    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,folds,train):
        
       
        
  

            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                


            

            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=folds) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,folds,train)
    
    # Print feature selection metrics
    if fitModel is not None:




            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Top 20 Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            featureImportances = BestModel.featureImportances.toArray()
            # Convert from numpy array to list
            imp_scores = []
            for x in featureImportances:
                imp_scores.append(float(x))
            # Then zip with input_columns list and create a df
            result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
            print(result.orderBy(result["score"].desc()).show(truncate=False))
            

            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

  
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

#### Decision Tree Classifier model

In [334]:
def DecisionTreeClassifiermodel(features,classes,folds,train,test,input_columns) :
    """ return  Decision Tree Classifier accuracy 
             
        Parameters:
        -------------- 
                   
            features(:class:`DataFrame) : features column       
            
            classes(int) : number of column target 

            folds(str) : estimate the skill of the model on new data 

            train(:class:`DataFrame) : train dataframe 
            
            test(:class:`DataFrame) :   test dataframe 
            
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return Decision Tree Classifier  accuracy
              
            """
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    classifier=DecisionTreeClassifier()
    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,folds,train):
        
       
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  

            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=folds) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,folds,train)
    
    # Print feature selection metrics
    if fitModel is not None:


            # FEATURE IMPORTANCES
            # Estimate of the importance of each feature.
            # Each feature’s importance is the average of its importance across all trees 
            # in the ensemble The importance vector is normalized to sum to 1. 
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Top 20 Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            featureImportances = BestModel.featureImportances.toArray()
            # Convert from numpy array to list
            imp_scores = []
            for x in featureImportances:
                imp_scores.append(float(x))
            # Then zip with input_columns list and create a df
            result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
            print(result.orderBy(result["score"].desc()).show(truncate=False))
            
            # Save the feature importance values and the models
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel


        # Print the coefficients

        # Print the Coefficients

        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
        
    return result
    #Also returns the fit model important scores or p values

#### GBTClassifier

In [335]:
def GBTClassifiermodel(features,classes,folds,train,test,input_columns):
    """ return GBT Classifier accuracy 
             
        Parameters:
        -------------- 
                   
            features(:class:`DataFrame) : features column       
            
            classes(int) : number of column target 

            folds(str) : estimate the skill of the model on new data 

            train(:class:`DataFrame) : train dataframe 
            
            test(:class:`DataFrame) :   test dataframe 
            
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return GBT Classifier  accuracy
              
            """
    from pyspark.ml.classification import GBTClassifier
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    classifier=GBTClassifier()
    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,folds,train):
        
        
  

        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  

            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                


            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=folds) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,folds,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        






            # FEATURE IMPORTANCES
            # Estimate of the importance of each feature.
            # Each feature’s importance is the average of its importance across all trees 
            # in the ensemble The importance vector is normalized to sum to 1. 
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Top 20 Feature Importances"+ '\033[0m')
            print("(Scores add up to 1)")
            print("Lowest score is the least important")
            print(" ")
            featureImportances = BestModel.featureImportances.toArray()
            # Convert from numpy array to list
            imp_scores = []
            for x in featureImportances:
                imp_scores.append(float(x))
            # Then zip with input_columns list and create a df
            result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
            print(result.orderBy(result["score"].desc()).show(truncate=False))
            
            # Save the feature importance values and the models

            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel


        
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

#### Linear SVC model

In [336]:
def LinearSVCmodel(features,classes,folds,train,test,input_columns):
    """ return Linear SVC accuracy 
             
        Parameters:
        -------------- 
                   
            features(:class:`DataFrame) : features column       
            
            classes(int) : number of column target 

            folds(int) : estimate the skill of the model on new data 

            train(:class:`DataFrame) : train dataframe 
            
            test(:class:`DataFrame) :   test dataframe 
            
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return Linear SVC  accuracy
              
            """
    from pyspark.ml.classification import LinearSVC
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
    classifier=LinearSVC()
    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,folds,train):
        


        if Mtype in("LinearSVC") and classes != 2: # These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  


            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())

            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=folds) # 3 + is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,folds,train)
    
    # Print feature selection metrics
    if fitModel is not None:
        

        if Mtype in("LinearSVC"):
            # Get Best Model
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            print("Intercept: " + str(BestModel.intercept))
            print('\033[1m' + "Top 20 Coefficients"+ '\033[0m')
            print("You should compares these relative to eachother")
#             print("Coefficients: \n" + str(BestModel.coefficients))
            coeff_array = BestModel.coefficients.toArray()
            coeff_scores = []
            for x in coeff_array:
                coeff_scores.append(float(x))
            # Then zip with input_columns list and create a df
            result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
            print(result.orderBy(result["coeff"].desc()).show(truncate=False))
            # Save the coefficient values and the models
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a list
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result
    #Also returns the fit model important scores or p values

### test with  or without treat outliers,treat negative values

In [340]:
def modeltest(dataframe,classifier,dependent_var:str,input_columns:list,trainsplite:int,testsplite:int,seed:int,folds:int,treat_outliers:bool,treat_neg_values:bool):
    """ return all model accuracy 
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : dataframe        
            
            classifier : name of model incude () 

            dependent_var(str) : estimate the skill of the model on new data 

            input_columns(list) : input_columns
            
            trainsplite(int) :   number train splite

            testsplite(int) :   number test splite

            seed(int) : seed of train and test
            
            folds(int) : estimate the skill of the model on new data
            
            treat_outliers(bool) : fix outliers 

            treat_neg_values(bool) :fix negative value 
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return all  accuracy
              
            """
    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        return Mtype
    Mtype = FindMtype(classifier)
  

    final_data=MLClassifierDFPrep(df,input_columns,dependent_var,treat_outliers=treat_outliers,treat_neg_values=treat_neg_values)
    class_count = df.select(countDistinct(dependent_var)).collect()
    classes = class_count[0][0]
    classes

    train,test = final_data.randomSplit([trainsplite,testsplite],seed)
    features = final_data.select(['features']).collect()
    
    #set up your results table
    columns = ['Classifier', 'Result']
    vals = [("Place Holder","N/A")]
    results = spark.createDataFrame(vals, columns)

    if Mtype=="GBTClassifier":
        new_result = GBTClassifiermodel(features,classes,folds,train,test,input_columns)
    elif Mtype=="RandomForestClassifier":
        new_result = RandomForestClassifiermodel(features,classes,folds,train,test,input_columns)
    elif Mtype=="OneVsRest":
        new_result = OneVsRestmodel(features,classes,folds,train,test,input_columns)
    elif Mtype=="LogisticRegression":
        new_result = LogisticRegressionmodel(features,classes,folds,train,test,input_columns)
    elif Mtype=="NaiveBayes":
        new_result = NaiveBayesmodel(features,classes,folds,train,test,input_columns)
    elif Mtype=="DecisionTreeClassifier":
        new_result = DecisionTreeClassifiermodel(features,classes,folds,train,test,input_columns)
    elif Mtype=="LinearSVC":
        new_result = LinearSVCmodel(features,classes,folds,train,test,input_columns)
    elif Mtype=="MultilayerPerceptronClassifier":
      new_result = MultilayerPerceptronClassifiermodel(features,classes,folds,train,test,input_columns) 
    
    results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    
    return results

In [355]:
classifier=[GBTClassifier(),RandomForestClassifier(),OneVsRest(),
            LogisticRegression(),NaiveBayes(),DecisionTreeClassifier()
            ,LinearSVC(),MultilayerPerceptronClassifier()]

In [357]:

modeltest(df,GBTClassifier(),"Outcome",df.columns[0:-1],0.7,0.3,123,5,True,True).show()

We are correcting for non normality now!
Insulin has been treated for positive (right) skewness. (skew =) 3.3734139434873414 )
DiabetesPedigreeFunction has been treated for positive (right) skewness. (skew =) 1.9161592037386226 )
Age has been treated for positive (right) skewness. (skew =) 1.127389259531697 )
You have opted to correct that by rescaling all your features to a range of 0 to 1
 
We are rescaling you dataframe....
Done!
 
[1mGBTClassifier  Top 20 Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+------------------------+-------------------+
|feature                 |score              |
+------------------------+-------------------+
|Glucose                 |0.2445106011892125 |
|BMI                     |0.15673488723653634|
|Age                     |0.1366733375833831 |
|DiabetesPedigreeFunction|0.11961241647189638|
|Insulin                 |0.11585728137788436|
|BloodPressure           |0.09374326190721782|
|Pregnancies             |0.0

In [358]:

modeltest(df,RandomForestClassifier(),"Outcome",df.columns[0:-1],0.7,0.3,123,5,True,True).show()

We are correcting for non normality now!
Insulin has been treated for positive (right) skewness. (skew =) 3.3734139434873414 )
DiabetesPedigreeFunction has been treated for positive (right) skewness. (skew =) 1.9161592037386226 )
Age has been treated for positive (right) skewness. (skew =) 1.127389259531697 )
You have opted to correct that by rescaling all your features to a range of 0 to 1
 
We are rescaling you dataframe....
Done!
 
[1mRandomForestClassifier  Top 20 Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+------------------------+--------------------+
|feature                 |score               |
+------------------------+--------------------+
|Glucose                 |0.4347364963694499  |
|BMI                     |0.15543666499874026 |
|Age                     |0.14739573021724706 |
|Insulin                 |0.06807217806009147 |
|DiabetesPedigreeFunction|0.065477612609084   |
|Pregnancies             |0.058389841966355535|
|SkinThickn

In [359]:
modeltest(df,OneVsRest(),"Outcome",df.columns[0:-1],0.7,0.3,123,5,True,True).show()


We are correcting for non normality now!
Insulin has been treated for positive (right) skewness. (skew =) 3.3734139434873414 )
DiabetesPedigreeFunction has been treated for positive (right) skewness. (skew =) 1.9161592037386226 )
Age has been treated for positive (right) skewness. (skew =) 1.127389259531697 )
You have opted to correct that by rescaling all your features to a range of 0 to 1
 
We are rescaling you dataframe....
Done!
 
[1mOneVsRest[0m
[1mIntercept: [0m 5.66188337049584 [1m
Coefficients:[0m [-2.4044265416180717,-5.2098568611396,0.7017253197643596,-0.18688371376648574,-0.1048554518342095,-3.773771799413517,-2.2191032882175987,-0.6704600044066444]
[1mIntercept: [0m -5.661883370495838 [1m
Coefficients:[0m [2.404426541618076,5.209856861139596,-0.7017253197643795,0.18688371376651206,0.10485545183422967,3.7737717994135007,2.2191032882175965,0.6704600044066383]
!!!!!Final Results!!!!!!!!
+----------+------+
|Classifier|Result|
+----------+------+
| OneVsRest| 77.11|
+

In [360]:
modeltest(df,NaiveBayes(),"Outcome",df.columns[0:-1],0.7,0.3,123,5,True,True).show()


We are correcting for non normality now!
Insulin has been treated for positive (right) skewness. (skew =) 3.3734139434873414 )
DiabetesPedigreeFunction has been treated for positive (right) skewness. (skew =) 1.9161592037386226 )
Age has been treated for positive (right) skewness. (skew =) 1.127389259531697 )
You have opted to correct that by rescaling all your features to a range of 0 to 1
 
We are rescaling you dataframe....
Done!
!!!!!Final Results!!!!!!!!
+----------+------+
|Classifier|Result|
+----------+------+
|NaiveBayes| 63.55|
+----------+------+



In [361]:
modeltest(df,DecisionTreeClassifier(),"Outcome",df.columns[0:-1],0.7,0.3,123,5,True,True).show()


We are correcting for non normality now!
Insulin has been treated for positive (right) skewness. (skew =) 3.3734139434873414 )
DiabetesPedigreeFunction has been treated for positive (right) skewness. (skew =) 1.9161592037386226 )
Age has been treated for positive (right) skewness. (skew =) 1.127389259531697 )
You have opted to correct that by rescaling all your features to a range of 0 to 1
 
We are rescaling you dataframe....
Done!
 
[1mDecisionTreeClassifier  Top 20 Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+------------------------+--------------------+
|feature                 |score               |
+------------------------+--------------------+
|Glucose                 |0.524669329184939   |
|BMI                     |0.14609141431697234 |
|Age                     |0.14119653880703806 |
|Pregnancies             |0.11022998666381316 |
|DiabetesPedigreeFunction|0.03547155437607893 |
|SkinThickness           |0.02007767372060785 |
|Insulin   

In [362]:
modeltest(df,LinearSVC(),"Outcome",df.columns[0:-1],0.7,0.3,123,5,True,True).show()


We are correcting for non normality now!
Insulin has been treated for positive (right) skewness. (skew =) 3.3734139434873414 )
DiabetesPedigreeFunction has been treated for positive (right) skewness. (skew =) 1.9161592037386226 )
Age has been treated for positive (right) skewness. (skew =) 1.127389259531697 )
You have opted to correct that by rescaling all your features to a range of 0 to 1
 
We are rescaling you dataframe....
Done!
 
[1mLinearSVC[0m
Intercept: -4.217133136821038
[1mTop 20 Coefficients[0m
You should compares these relative to eachother
+------------------------+-------------------+
|feature                 |coeff              |
+------------------------+-------------------+
|Glucose                 |4.419698105145853  |
|BMI                     |2.7378577519646683 |
|DiabetesPedigreeFunction|1.8041378426905303 |
|Pregnancies             |1.58500569398122   |
|Age                     |0.20182264687068815|
|Insulin                 |0.02631035060010266|
|SkinThickness

In [363]:
modeltest(df,MultilayerPerceptronClassifier(),"Outcome",df.columns[0:-1],0.7,0.3,123,5,True,True).show()


We are correcting for non normality now!
Insulin has been treated for positive (right) skewness. (skew =) 3.3734139434873414 )
DiabetesPedigreeFunction has been treated for positive (right) skewness. (skew =) 1.9161592037386226 )
Age has been treated for positive (right) skewness. (skew =) 1.127389259531697 )
You have opted to correct that by rescaling all your features to a range of 0 to 1
 
We are rescaling you dataframe....
Done!
!!!!!Final Results!!!!!!!!
+--------------------+------+
|          Classifier|Result|
+--------------------+------+
|MultilayerPercept...| 72.88|
+--------------------+------+



### test with feature selection

In [382]:
def  FeatureSelectionrandommodel(dataframe,classifier,input_columns:list,dependent_var:str,trainsplite:int,testsplite:int,seed:int,folds:int,start:int,step:int):  
    """ return all model accuracy  with feature selection
             
        Parameters:
        -------------- 
                   
            dataframe(:class:`DataFrame) : dataframe        
            
            classifier : name of model incude () 

            dependent_var(str) : estimate the skill of the model on new data 

            input_columns(list) : input_columns
            
            trainsplite(int) :   number train splite

            testsplite(int) :   number test splite

            seed(int) : seed of train and test
            
            folds(int) : estimate the skill of the model on new data
            
            treat_outliers(bool) : fix outliers 

            treat_neg_values(bool) :fix negative value 
                       
        Returns:
        --------------
            result (:class:`DataFrame) : return all  accuracy
              
            """

    from pyspark.ml.feature import VectorSlicer
    from pyspark.ml.feature import ChiSqSelector
    from pyspark.sql.functions import countDistinct
    from pyspark.ml.linalg import Vectors
    import numpy as np
    def FindMtype(classifier):
          # Intstantiate Model
          M = classifier
          # Learn what it is
          Mtype = type(M).__name__

          return Mtype
    Mtype = FindMtype(classifier)
    test2_data=MLClassifierDFPrep(df,input_columns,dependent_var,treat_outliers=True,treat_neg_values=True)
    class_count = df.select(countDistinct(dependent_var)).collect()
    classes = class_count[0][0]
    classes
    #Select the top n features and view results
    maximum = len(input_columns)
    for n in np.arange(2,maximum).tolist():
        print("Testing top n = ",n," features")
        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # For Tree classifiers
            best_n_features = RF_featureimportances.argsort()[-n:][::-1]
            best_n_features= best_n_features.tolist() # convert to a list
            vs = VectorSlicer(inputCol="features", outputCol="best_features", indices=best_n_features)
            bestFeaturesDf = vs.transform(test2_data)

        else:
            selector = ChiSqSelector(numTopFeatures=n, featuresCol="features",
                                outputCol="selectedFeatures", labelCol="label")
            bestFeaturesDf = selector.fit(test2_data).transform(test2_data)
            bestFeaturesDf = bestFeaturesDf.select("label","selectedFeatures")
            bestFeaturesDf = bestFeaturesDf.withColumnRenamed("selectedFeatures","features")

        # Collect features
        features = bestFeaturesDf.select(['features']).collect()

        # Split
        train,test = bestFeaturesDf.randomSplit([0.7,0.3])

        # Specify folds


        #set up your results table
        columns = ['Classifier', 'Result']
        vals = [("Place Holder","N/A")]
        results = spark.createDataFrame(vals, columns)

        
        if Mtype=="GBTClassifier":
            new_result = GBTClassifiermodel(features,classes,folds,train,test,input_columns)
        elif Mtype=="RandomForestClassifier":
            new_result = RandomForestClassifiermodel(features,classes,folds,train,test,input_columns)
        elif Mtype=="OneVsRest":
            new_result = OneVsRestmodel(features,classes,folds,train,test,input_columns)
        elif Mtype=="LogisticRegression":
            new_result = LogisticRegressionmodel(features,classes,folds,train,test,input_columns)
        elif Mtype=="NaiveBayes":
            new_result = NaiveBayesmodel(features,classes,folds,train,test,input_columns)
        elif Mtype=="DecisionTreeClassifier":
            new_result = DecisionTreeClassifiermodel(features,classes,folds,train,test,input_columns)
        elif Mtype=="LinearSVC":
            new_result = LinearSVCmodel(features,classes,folds,train,test,input_columns)
        elif Mtype=="MultilayerPerceptronClassifier":
            new_result = MultilayerPerceptronClassifiermodel(features,classes,folds,train,test,input_columns)
        results = results.union(new_result)
        results = results.where("Classifier!='Place Holder'")
        results.show(100,False)

In [383]:
FeatureSelectionrandommodel(df,DecisionTreeClassifier(),df.columns[0:-1],"Outcome",0.7,0.3,123,5,1,2)

We are correcting for non normality now!
Insulin has been treated for positive (right) skewness. (skew =) 3.3734139434873414 )
DiabetesPedigreeFunction has been treated for positive (right) skewness. (skew =) 1.9161592037386226 )
Age has been treated for positive (right) skewness. (skew =) 1.127389259531697 )
You have opted to correct that by rescaling all your features to a range of 0 to 1
 
We are rescaling you dataframe....
Done!
Testing top n =  2  features
 
[1mDecisionTreeClassifier  Top 20 Feature Importances[0m
(Scores add up to 1)
Lowest score is the least important
 
+------------------------+--------------------+
|feature                 |score               |
+------------------------+--------------------+
|Glucose                 |0.5348526634496975  |
|BMI                     |0.21481439722403298 |
|Age                     |0.09713117413827096 |
|Insulin                 |0.06960094286518759 |
|DiabetesPedigreeFunction|0.04376108432647735 |
|Pregnancies             |0.02