<text> INSPER - Big Data e Computação em Nuvem <text>
    
<text> Grupo 1: André Gambry, Mainara Cardoso, Tiago Pardo <text>

# Big Data e Computação em Nuvem - Projeto Final

## Objetivo do projeto: Previsão de cancelamento de vôos por condições meteorológicas
    
## Dados: 
    Airline Delay: https://www.kaggle.com/yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018
    Airports.csv: https://github.com/PacktPublishing/Pandas-Cookbook/blob/master/data/descriptions/airports.csv?plain=1
    Weather: https://github.com/adambry/Projeto_Final_BigData/blob/main/df_weather.csv
    
Obs: Os dados de Weather foram obtidos via Web Scrapping do site: https://www.ncei.noaa.gov/
    
    




# 1. BIBLIOTECAS, SESSÃO SPARK E IMPORTAÇÃO DE DADOS




## 1.1 Importação de bibliotecas

In [22]:
from pyspark.sql.functions import dayofmonth, col, expr, year, month, date_format, when, countDistinct, trim, lower
from matplotlib import pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from io import StringIO
import requests
import pandas as pd

## 1.2 Sessão Spark

In [2]:
spark = (SparkSession
            .builder
            .master("local[4]")
            .appName("Projeto Final")
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.4,com.microsoft.azure:azure-storage:8.6.6")
            .config("spark.driver.memory", "4g")
            .config("spark.executor.memory", "5g")
            .getOrCreate())

In [3]:
sc = spark.sparkContext
#sc._conf.getAll()

## 1.3 Importação dos dados

### 1.3.1. Base "airline delay"

In [4]:
STORAGE_ACCOUNT = 'dlspadseastusprod'
CONTAINER = 'big-data-comp-nuvem'
FOLDER = 'airline-delay'
TOKEN = 'lSuH4ZI9BhOFEhCF/7ZQbrpPBIhgtLcPDfXjJ8lMxQZjaADW4p6tcmiZGDX9u05o7FqSE2t9d2RD+ASt0YFG8g=='

spark.conf.set("fs.azure.account.key." + STORAGE_ACCOUNT + ".blob.core.windows.net", TOKEN)

<font color = red> 
    No snipet abaixo estamos importanto dados apenas para o arquivo do ano de 2011. Ajustar para importar dos demais anos
    <font> 

In [5]:
config = spark.sparkContext._jsc.hadoopConfiguration()
config.set("fs.azure.account.key." + STORAGE_ACCOUNT + ".blob.core.windows.net", TOKEN)

df_airline_delay = spark.read.csv("wasbs://{}@{}.blob.core.windows.net/{}/2011.csv".format(CONTAINER, STORAGE_ACCOUNT, FOLDER), header=True, inferSchema=True)

df_airline_delay.limit(5).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,...,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,Unnamed: 27
0,2011-01-01,MQ,4529,BOS,JFK,1830,1823.0,-7.0,68.0,1931.0,...,90.0,146.0,48.0,187.0,0.0,0.0,49.0,0.0,0.0,
1,2011-01-01,MQ,4532,BNA,DCA,1100,1052.0,-8.0,11.0,1103.0,...,95.0,88.0,74.0,562.0,,,,,,
2,2011-01-01,MQ,4532,DCA,JFK,1400,1358.0,-2.0,9.0,1407.0,...,79.0,73.0,60.0,213.0,,,,,,
3,2011-01-01,MQ,4537,RDU,JFK,1710,1706.0,-4.0,59.0,1805.0,...,105.0,159.0,85.0,426.0,0.0,0.0,50.0,0.0,0.0,
4,2011-01-01,MQ,4540,CMH,LGA,1340,1340.0,0.0,14.0,1354.0,...,105.0,95.0,77.0,478.0,,,,,,


### 1.3.2. Base "weather"

In [6]:
# URL do arquivo no GitHub
url = "https://github.com/adambry/Projeto_Final_BigData/raw/main/df_weather.csv"

df_weather = pd.read_csv(url)

df_weather = spark.createDataFrame(df_weather)

df_weather.limit(5).toPandas()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


Unnamed: 0,State,Year,Month,tavg,pcp
0,Alabama,2018,December,49.0,9.95
1,Alabama,2018,November,50.4,6.29
2,Alabama,2018,October,67.4,2.69
3,Alabama,2018,September,79.4,6.21
4,Alabama,2018,August,79.5,5.03


In [7]:
#verificando ocorrência de duplicadas na base
df_weather_duplicadas = df_weather.groupBy("State", "Year","Month", "tavg", "pcp").count().filter(col("count") > 1)

df_weather_duplicadas.count()

5880

Veriricamos que a importação dos dados via o WebScrapping duplicou cada valor na base. Portanto, removeremos as duplicadas.

In [8]:
df_weather = df_weather.dropDuplicates()

df_weather.count()

5880

Filtrando dados do ano de 2011

In [9]:
df_weather = df_weather.filter(col("Year") == 2011)

df_weather.toPandas().nunique()

State     49
Year       1
Month     12
tavg     394
pcp      391
dtype: int64

### 1.3.3. Base "airports"

In [10]:
url = 'https://raw.githubusercontent.com/adambry/Projeto_Final_BigData/main/Aeroportos_US.csv'

# Especifique as colunas de interesse
df_airports_region = pd.read_csv(url, delimiter=';')

df_airports_region = spark.createDataFrame(df_airports_region)

df_airports_region.limit(5).toPandas()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


Unnamed: 0,type,name,iso_country,iso_region,iata_code
0,small_airport,Ocean Reef Club Airport,US,FL,OCA
1,small_airport,Pilot Station Airport,US,AK,PQS
2,small_airport,Crested Butte Airpark,US,CO,CSE
3,small_airport,LBJ Ranch Airport,US,TX,JCY
4,small_airport,Nunapitchuk Airport,US,AK,NUP


# 2. TRATAMENTO NOS DADOS

## 2.1. Limpeza Aeroportos fora dos EUA

In [11]:
# Estudo da base para verificar se os estados registrados na coluna STATE na base df_airports_region estão corretos
df_airports_region.select("iso_region").distinct().count()

51

Sabemos que os Estados Unidos possuem 50 estados. Poranto, precisamos vamos identificar os excedentes e retirá-los da base

In [12]:
# identificando Estados Excedentes

# Lista de estados dos EUA (https://pt.wikipedia.org/wiki/Estados_dos_Estados_Unidos)
us_states = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']

# Filtra os estados excedentes
excess_states = df_airports_region.select("iso_region").distinct().filter(~col("iso_region").isin(us_states))

# Exibe os estados excedentes
excess_states.show()


+----------+
|iso_region|
+----------+
|        DC|
+----------+



verificando quais aeroportos estão nos Estados não identificados

In [13]:
# Filtra os aeroportos com base nos estados excedentes
df_excess_airports = df_airports_region.filter(col("iso_region").isin([row.iso_region for row in excess_states.collect()]))

# Exibe os aeroportos nos estados excedentes
df_excess_airports.show(truncate=False)

+-------------+-----------------------------------------+-----------+----------+---------+
|type         |name                                     |iso_country|iso_region|iata_code|
+-------------+-----------------------------------------+-----------+----------+---------+
|large_airport|Ronald Reagan Washington National Airport|US         |DC        |DCA      |
+-------------+-----------------------------------------+-----------+----------+---------+



A partir de consultas na internet, verificamos que este aeroporto fica em Washginton. Portanto, a iso_region deve ser alterado para WA, sigla do Estado.

In [14]:
# Substitui o valor "DC" por "WA" na coluna "iso_region"
df_airports_region = df_airports_region.withColumn("iso_region", when(col("iso_region") == "DC", "WA").otherwise(col("iso_region")))

# Exibe os aeroportos após a substituição
df_airports_region.select("iso_region").distinct().count()

50

## 2.2. Transformação

### 2.2.1. Tabela df_airports_region

Transformação da sigla dos estados para o nome por extenso para corresponder aos valores da tabela df_weather

In [26]:
state_full_name = {
    'AL': 'Alabama',
    'AK': 'Alaska',
    'AZ': 'Arizona',
    'AR': 'Arkansas',
    'CA': 'California',
    'CO': 'Colorado',
    'CT': 'Connecticut',
    'DE': 'Delaware',
    'FL': 'Florida',
    'GA': 'Georgia',
    'HI': 'Hawaii',
    'ID': 'Idaho',
    'IL': 'Illinois',
    'IN': 'Indiana',
    'IA': 'Iowa',
    'KS': 'Kansas',
    'KY': 'Kentucky',
    'LA': 'Louisiana',
    'ME': 'Maine',
    'MD': 'Maryland',
    'MA': 'Massachusetts',
    'MI': 'Michigan',
    'MN': 'Minnesota',
    'MS': 'Mississippi',
    'MO': 'Missouri',
    'MT': 'Montana',
    'NE': 'Nebraska',
    'NV': 'Nevada',
    'NH': 'New Hampshire',
    'NJ': 'New Jersey',
    'NM': 'New Mexico',
    'NY': 'New York',
    'NC': 'North Carolina',
    'ND': 'North Dakota',
    'OH': 'Ohio',
    'OK': 'Oklahoma',
    'OR': 'Oregon',
    'PA': 'Pennsylvania',
    'RI': 'Rhode Island',
    'SC': 'South Carolina',
    'SD': 'South Dakota',
    'TN': 'Tennessee',
    'TX': 'Texas',
    'UT': 'Utah',
    'VT': 'Vermont',
    'VA': 'Virginia',
    'WA': 'Washington',
    'WV': 'West Virginia',
    'WI': 'Wisconsin',
    'WY': 'Wyoming'
}

# Criar a nova coluna com nomes de estados por extenso
df_airports_region = df_airports_region.withColumn("STATE_FULL_NAME", col("iso_region").cast("string")).replace(state_full_name, subset="STATE_FULL_NAME")

# Exibir o DataFrame resultante
df_airports_region.limit(5).toPandas()

Unnamed: 0,type,name,iso_country,iso_region,iata_code,STATE_FULL_NAME
0,small_airport,Ocean Reef Club Airport,US,FL,OCA,Florida
1,small_airport,Pilot Station Airport,US,AK,PQS,Alaska
2,small_airport,Crested Butte Airpark,US,CO,CSE,Colorado
3,small_airport,LBJ Ranch Airport,US,TX,JCY,Texas
4,small_airport,Nunapitchuk Airport,US,AK,NUP,Alaska


Retiraremos todos os espaços antes e depois das strings que utilizaremos para join e as deixaremos em snakecase.

In [28]:
df_airports_region = df_airports_region.withColumn("STATE_FULL_NAME", trim("STATE_FULL_NAME"))
df_airports_region = df_airports_region.withColumn("STATE_FULL_NAME", lower("STATE_FULL_NAME"))
df_airports_region = df_airports_region.withColumn("iata_code", trim("iata_code"))
df_airports_region = df_airports_region.withColumn("iata_code", lower("iata_code"))
df_airports_region.limit(5).toPandas()

Unnamed: 0,type,name,iso_country,iso_region,iata_code,STATE_FULL_NAME
0,small_airport,Ocean Reef Club Airport,US,FL,oca,florida
1,small_airport,Pilot Station Airport,US,AK,pqs,alaska
2,small_airport,Crested Butte Airpark,US,CO,cse,colorado
3,small_airport,LBJ Ranch Airport,US,TX,jcy,texas
4,small_airport,Nunapitchuk Airport,US,AK,nup,alaska


### 2.2.2. Tabela airline_delay

Retiraremos todos os espaços antes e depois das strings que utilizaremos para join e as deixaremos em snakecase.

In [39]:
df_airline_delay = df_airline_delay.withColumn("ORIGIN", trim("ORIGIN"))
df_airline_delay = df_airline_delay.withColumn("ORIGIN", lower("ORIGIN"))
df_airline_delay.select("ORIGIN").limit(5).toPandas()

Unnamed: 0,ORIGIN
0,bos
1,bna
2,dca
3,rdu
4,cmh


Criando colunas de ano e mês a partir da FL_DATE na tabela df_airline_delay

In [41]:
df_airline_delay = df_airline_delay.withColumn("Year", year("FL_DATE")).withColumn("Month", date_format(col("FL_DATE"), "MMMM"))
df_airline_delay.select("FL_DATE","Year", "Month").limit(5).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,FL_DATE,Year,Month
0,2011-01-01,2011,January
1,2011-01-01,2011,January
2,2011-01-01,2011,January
3,2011-01-01,2011,January
4,2011-01-01,2011,January


### 2.2.3. Tabela Weather

Variável State para snakecase e retirada de eventuais espaços antes e/ou depois da string

In [30]:
df_weather = df_weather.withColumn("State", trim("State"))
df_weather = df_weather.withColumn("State", lower("State"))
df_weather.limit(5).toPandas()

Unnamed: 0,State,Year,Month,tavg,pcp
0,california,2011,August,75.2,0.1
1,illinois,2011,August,74.8,1.87
2,arkansas,2011,January,37.6,1.6
3,california,2011,February,43.4,3.34
4,connecticut,2011,September,66.4,7.78


## 3.. Join nas tabelas


### 2.3.2. Join nas tabelas df_airport_region e df_airline_delay

In [32]:
df_airline_delay_airports = df_airline_delay.join(df_airports_region, df_airline_delay.ORIGIN == df_airports_region.iata_code, "inner")

df_airline_delay_airports.count()

6038444

### 3.1.3. Join nas tabelas df_airline_delay_airports e df_weather

Criando dataframe que será utilizado no pipeline

In [38]:
df = df_airline_delay_airports.join(df_weather, (df_airline_delay_airports.STATE_FULL_NAME == df_weather.State) & (df_airline_delay_airports.Year == df_weather.Year) & (df_airline_delay_airports.Month == df_weather.Month), "inner")

df.limit(5).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,...,name,iso_country,iso_region,iata_code,STATE_FULL_NAME,State,Year,Month,tavg,pcp
0,2011-01-01,OO,6486,mmh,SFO,1736,,,,,...,Mammoth Yosemite Airport,US,CA,mmh,california,california,2011,January,45.2,1.28
1,2011-01-01,WN,286,isp,BWI,945,945.0,0.0,9.0,954.0,...,Long Island Mac Arthur Airport,US,NY,isp,new york,new york,2011,January,18.6,2.01
2,2011-01-01,WN,1833,isp,BWI,820,826.0,6.0,6.0,832.0,...,Long Island Mac Arthur Airport,US,NY,isp,new york,new york,2011,January,18.6,2.01
3,2011-01-01,WN,1988,isp,BWI,1205,1303.0,58.0,8.0,1311.0,...,Long Island Mac Arthur Airport,US,NY,isp,new york,new york,2011,January,18.6,2.01
4,2011-01-01,WN,1990,isp,BWI,1545,1546.0,1.0,7.0,1553.0,...,Long Island Mac Arthur Airport,US,NY,isp,new york,new york,2011,January,18.6,2.01


Verificando se todos os valores corresponderam entre as duas tabelas

In [33]:
df_erro = df_airline_delay_airports.join(df_weather, (df_airline_delay_airports.STATE_FULL_NAME == df_weather.State) & (df_airline_delay_airports.Year == df_weather.Year) & (df_airline_delay_airports.Month == df_weather.Month), "left")

In [42]:
#Verificando falhas no Join

unmatched_rows = df_erro.filter(col("State").isNull())

# Contagem das linhas não correspondidas
unmatched_rows.count()

101304

Considerando que a base possui mais de 6 milhões de registros, não aprofundaremos em entender as 101304 que não corresponderam entre as tabelas.

## <font color = red> Retirar colunas que não fazem sentido do DF <font>

In [125]:
df = df.withColumn('WEATHER_DELAY_BIN', when(col('WEATHER_DELAY') == 0, 0).otherwise(1))

# 3. FEATURE ENGINEERING

## 3.1. Criação de colunas

### 3.1.1 Cassificando atrasos

In [62]:
# Fazendo variável com valor dos percentis 0.33 e 0.66 para posteriormente segmentar as distâncias.
# documentação consultada: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.approxQuantile.html

# percentis = df.stat.approxQuantile("DISTANCE", [0.33, 0.66], 0.01)

In [63]:
# Criando coluna no dataframe para classificação das distâncias a partir dos percentis calculados
# Foi utilizado o percentis[0] para acessar os valor do percentil 0.33 e percentis[1] para acessar o valor de percentis 0.66.
''' Foi necessário utilizar o recurso de format ao invés de utilizar diretamente a referência do objetos da variável percentis no corpo da expressão porque, de outra forma, o 
PySpark interpretou percentis[0] e percentis[1] como literais de string, e não como valores da lista criada na variável percentis'''

# df_classificacao_distancias = df.withColumn(
#     'CLASSIFICACAO_DISTANCIAS',
#     expr(
#         "CASE WHEN distance <= {} THEN 'proximos'"
#         " WHEN distance <= {} THEN 'medio'"
#         " ELSE 'distantes' END".format(percentis[0], percentis[1])
#     )
# )

# df_classificacao_distancias.show(10)

                                            

' Foi necessário utilizar o recurso de format ao invés de utilizar diretamente a referência do objetos da variável percentis no corpo da expressão porque, de outra forma, o \nPySpark interpretou percentis[0] e percentis[1] como literais de string, e não como valores da lista criada na variável percentis'

# 4 Análise Descritiva

## 4.1. Análise Univariada

### 4.1.1. Variáveis numéricas

<font color=red> criar aqui gráficos para visualizarmos a dispersão dos dados em boxplot para as colunas numéricas pertinentes <font>

### 4.1.2. Variáveis categóricas

<font color=red> criar aqui gráficos para visualizarmos a frequência das categorias para cada variável <font>

# 5. Análise Exploratória

## 5.1 Criação de hipóteses

<font color = red> criar aqui hipóteses subjetivas (de acordo com nossa interpretação subjetivas sobre os dados) de como as variáveis se comportam frente à variável resposta <font>
    
    Por exemplo: 
    1. Vôos entre aeroportos mais distantes tendem a ter mais atrasos por 'weather'
    2. O número de vôos cancelados por motivos de 'weather' é maior em meses que possuem maior precipitação



## 5.2. Análise Bivariada

<font color=red> criar gráficos buscando responder as hipóteses criadas 
    <font>

Isso é útil para criarmos uma intuição sobre a importância das variáveis nos modelos e obter maior autonomia para manipulação, evitando que nos tornemos totalmente dependente somente de algorítmos para seleção de variáveis, por exemplo.

### 5.2.1. Calculando % atraso em cada faixa

<font color = red> necessita ajuste para pegar apenas vôos cancelados por motivo de 'weather'

In [64]:
# frequencia_absoluta = df_classificacao_distancias.groupBy("CLASSIFICACAO_DISTANCIAS").count()
# total_atrasos = df_classificacao_distancias.count()

# frequencia_absoluta.show()
# total_atrasos

In [65]:
# porcentagem_atrasos_por_classificacao_distancia = frequencia_absoluta.withColumn(
#     "PORCENTAGEM",
#     expr("count / {} * 100".format(total_atrasos))
# )

# porcentagem_atrasos_por_classificacao_distancia.show()

### 5.1.2. Número de vôos diários

In [66]:
# # Plotando gráfico com o número de vôos diários

# plt.figure(figsize=(15,8))
# ax = sns.lineplot(data=df_BOS_agrupado_data_pandas, x='FL_DATE', y='count')
# ax.set_xticklabels(ax.get_xticklabels(), rotation=90, ha='right')
# ax.set(title='Número Diário de Voos',
#        xlabel='Data',
#        ylabel='Número de Voos')

# plt.show()

# 5.2. Análise Multivariada

<font color= red> plotar aqui gráfico de correlação <font>

# 6. Modelagem de Machine Learning

In [152]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

In [131]:
cols = ['ORIGIN', 'DEST', 'State', 'tavg', 'pcp', 'WEATHER_DELAY_BIN']
df_ml = df.select(*cols)

df_ml_sample = df_ml.sample(fraction=0.1, seed=3)

cat_columns = ['ORIGIN', 'DEST', 'State']
num_columns = ['tavg', 'pcp']

index_output = [x+"Index" for x in cat_columns]
ohe_output = [x+"OHE" for x in cat_columns]

In [132]:
stringIndexer = StringIndexer(inputCols=cat_columns,
                             outputCols=index_output,
                             handleInvalid="skip")

oheEncoder = OneHotEncoder(inputCols=index_output,
                          outputCols=ohe_output)

# Montar os recursos em um vetor
assembler = VectorAssembler(inputCols=assemblerInputs,
                     outputCol="features")

In [133]:
# Dividir os dados em conjuntos de treinamento e teste
(train_data, test_data) = df_ml_sample.randomSplit([0.8, 0.2], seed=1234)

# Criar instâncias dos modelos de Regressão Logística e Floresta Aleatória


## 6.1 Regressão Logística

In [147]:
lr = LogisticRegression(featuresCol='features', labelCol='WEATHER_DELAY_BIN')
pipeline_lr = Pipeline(stages=[stringIndexer, oheEncoder, assembler, lr])

model_lr = pipeline_lr.fit(train_data)

predictions_lr = model_lr.transform(test_data)

evaluator_lr = BinaryClassificationEvaluator(labelCol='WEATHER_DELAY_BIN')

evaluation_lr = evaluator_lr.evaluate(predictions_lr)

print(f'AUC:{evaluation_lr}')

AUC:0.9999999457990196


## 6.2 Floresta Aleatória

In [148]:
rf = RandomForestRegressor(featuresCol='features', labelCol='WEATHER_DELAY_BIN')
pipeline_rf = Pipeline(stages=[stringIndexer, oheEncoder, assembler, rf])

model_rf = pipeline_rf.fit(train_data)

predictions_rf = model_rf.transform(test_data)

evaluator_rf = RegressionEvaluator(labelCol="WEATHER_DELAY_BIN", predictionCol="prediction")

evaluation_rf = evaluator_rf.evaluate(predictions_rf)

print(f'RMSE:{evaluation_rf}')

RMSE:0.08993337109305


## 6.3 Floresta aleatória com otimização de hiperparâmetros e validação cruzada

In [150]:
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [10, 100])
            .build)

In [154]:
cv = CrossValidator(
    estimator=pipeline_rf,
    evaluator=evaluator_rf,
    estimatorParamMaps=paramGrid,
    numFolds=5,
    seed=42
)

cvModel = cv.fit(train_data)

TypeError: object of type 'method' has no len()